You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/06 10:39:40 UTC
[incubator-pinot] 01/01: Adding template support for Pinot
Ingestion Job Spec
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ecd43a9e30720e7c9f67aee4243fdb619a55b42d
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 6 03:38:59 2020 -0700
Adding template support for Pinot Ingestion Job Spec
---
pinot-spi/pom.xml | 4 +
.../spi/ingestion/batch/IngestionJobLauncher.java | 22 ++++--
.../apache/pinot/spi/utils/JinjaTemplateUtils.java | 66 +++++++++++++++++
.../ingestion/batch/IngestionJobLauncherTest.java | 42 +++++++++++
.../pinot/spi/utils/JinjaTemplateUtilsTest.java | 85 ++++++++++++++++++++++
.../test/resources/ingestionJobSpecTemplate.yaml | 45 ++++++++++++
.../command/LaunchDataIngestionJobCommand.java | 19 ++++-
pom.xml | 6 ++
8 files changed, 283 insertions(+), 6 deletions(-)
diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index 8ad0aec..b1a4b8b 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -100,6 +100,10 @@
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
+ <groupId>com.hubspot.jinjava</groupId>
+ <artifactId>jinjava</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9bb740a..c3ae5ea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -20,13 +20,17 @@ package org.apache.pinot.spi.ingestion.batch;
import java.io.BufferedReader;
import java.io.FileReader;
-import java.io.Reader;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
@@ -49,11 +53,19 @@ public class IngestionJobLauncher {
System.exit(1);
}
String jobSpecFilePath = args[0];
+ List<String> valueList = Arrays.asList(args);
+ valueList.remove(0);
+ SegmentGenerationJobSpec spec =
+ getSegmentGenerationJobSpec(jobSpecFilePath, JinjaTemplateUtils.getTemplateContext(valueList));
+ runIngestionJob(spec);
+ }
- try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
- SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
- runIngestionJob(spec);
- }
+ public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath,
+ Map<String, Object> context)
+ throws IOException {
+ String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath)));
+ String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+ return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
}
public static void runIngestionJob(SegmentGenerationJobSpec spec)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
new file mode 100644
index 0000000..76be359
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.fasterxml.jackson.databind.util.StdDateFormat;
+import com.hubspot.jinjava.Jinjava;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JinjaTemplateUtils {
+
+ private static final Jinjava JINJAVA = new Jinjava();
+ private static final StdDateFormat STD_DATE_FORMAT = new StdDateFormat();
+
+ public static String renderTemplate(String template, Map<String, Object> newContext) {
+ Map<String, Object> contextMap = getDefaultJinjaContextMap();
+ contextMap.putAll(newContext);
+ return JINJAVA.render(template, contextMap);
+ }
+
+ public static Map<String, Object> getDefaultJinjaContextMap() {
+ Map<String, Object> defaultJinjaContextMap = new HashMap<>();
+ defaultJinjaContextMap.put("now", Instant.now());
+ defaultJinjaContextMap.put("today", STD_DATE_FORMAT.format(new Date(Instant.now().toEpochMilli())));
+ defaultJinjaContextMap
+ .put("yesterday", STD_DATE_FORMAT.format(new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli())));
+ return defaultJinjaContextMap;
+ }
+
+ public static Map<String, Object> getTemplateContext(List<String> values) {
+ Map<String, Object> context = new HashMap<>();
+ for (String value : values) {
+ String[] splits = value.split("=", 2);
+ if (splits.length > 1) {
+ context.put(splits[0], splits[1]);
+ }
+ }
+ return context;
+ }
+
+ public static String renderTemplate(String template) {
+ return renderTemplate(template, Collections.emptyMap());
+ }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
new file mode 100644
index 0000000..7d54f0f
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.batch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IngestionJobLauncherTest {
+
+ @Test
+ public void testIngestionJobLauncherWithTemplate()
+ throws IOException {
+ Map<String, Object> context =
+ JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+ SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+ JinjaTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context);
+ Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+ Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+ }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
new file mode 100644
index 0000000..8e13aed
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class JinjaTemplateUtilsTest {
+
+ @Test
+ public void testDefaultRenderTemplate() {
+ Date today = new Date(Instant.now().toEpochMilli());
+ Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli());
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ today }}").substring(0, 17), dateFormat.format(today));
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ yesterday }}").substring(0, 17),
+ dateFormat.format(yesterday));
+ }
+
+ @Test
+ public void testRenderTemplateWithGivenContextMap() {
+ Map<String, Object> contextMap = new HashMap<>();
+ contextMap.put("first_date_2020", "2020-01-01");
+ contextMap.put("name", "xiang");
+ contextMap.put("ts", 1577836800);
+ contextMap.put("yyyy", "2020");
+ contextMap.put("MM", "05");
+ contextMap.put("dd", "06");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ first_date_2020 }}", contextMap), "2020-01-01");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{first_date_2020}}", contextMap), "2020-01-01");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name }}", contextMap), "xiang");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name|upper }}", contextMap), "XIANG");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ ts }}", contextMap), "1577836800");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{ yyyy }}/{{ MM }}/{{ dd }}", contextMap),
+ "/var/rawdata/2020/05/06");
+ Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{yyyy}}/{{MM}}/{{dd}}", contextMap),
+ "/var/rawdata/2020/05/06");
+ }
+
+ @Test
+ public void testIngestionJobTemplate()
+ throws IOException {
+ InputStream resourceAsStream =
+ JinjaTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml");
+ String yamlTemplate = IOUtils.toString(resourceAsStream);
+ Map<String, Object> context =
+ JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+ String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+ SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
+ Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+ Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+ }
+}
diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
new file mode 100644
index 0000000..5032e17
--- /dev/null
+++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+ name: 'standalone'
+ segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+ segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+ segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input/{{ year }}/{{ month }}/{{ day }}'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output/{{year}}/{{month}}/{{day}}'
+overwriteOutput: true
+pinotFSSpecs:
+ - scheme: file
+ className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+ dataFormat: 'parquet'
+ className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+ tableName: 'myTable'
+ schemaURI: 'http://localhost:9000/tables/myTable/schema'
+ tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+ - controllerURI: 'localhost:9000'
+pushJobSpec:
+ pushAttempts: 2
+ pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 49c31b2..3c491fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,11 +18,22 @@
*/
package org.apache.pinot.tools.admin.command;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
import org.apache.pinot.tools.Command;
import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
/**
@@ -35,6 +46,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
@Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file")
private String _jobSpecFile;
+ @Option(name = "-values", required = true, metaVar = "<list<string>>", handler = StringArrayOptionHandler.class, usage = "values set in the job spec template")
+ private List<String> _values;
+
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help = false;
@@ -47,7 +61,10 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
public boolean execute()
throws Exception {
try {
- IngestionJobLauncher.main(new String[]{_jobSpecFile});
+ List<String> arguments = new ArrayList();
+ arguments.add(_jobSpecFile);
+ arguments.addAll(_values);
+ IngestionJobLauncher.main(arguments.toArray(new String[0]));
} catch (Exception e) {
LOGGER.error("Got exception to kick off standalone data ingestion job -", e);
throw e;
diff --git a/pom.xml b/pom.xml
index d1494f3..846e803 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.11</scala.version>
<antlr.version>4.6</antlr.version>
+ <jinjava.version>2.5.3</jinjava.version>
<calcite.version>1.19.0</calcite.version>
<lucene.version>8.2.0</lucene.version>
<!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -962,6 +963,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.hubspot.jinjava</groupId>
+ <artifactId>jinjava</artifactId>
+ <version>${jinjava.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-babel</artifactId>
<version>${calcite.version}</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org