You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/06 10:39:40 UTC

[incubator-pinot] 01/01: Adding template support for Pinot Ingestion Job Spec

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ecd43a9e30720e7c9f67aee4243fdb619a55b42d
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 6 03:38:59 2020 -0700

    Adding template support for Pinot Ingestion Job Spec
---
 pinot-spi/pom.xml                                  |  4 +
 .../spi/ingestion/batch/IngestionJobLauncher.java  | 22 ++++--
 .../apache/pinot/spi/utils/JinjaTemplateUtils.java | 66 +++++++++++++++++
 .../ingestion/batch/IngestionJobLauncherTest.java  | 42 +++++++++++
 .../pinot/spi/utils/JinjaTemplateUtilsTest.java    | 85 ++++++++++++++++++++++
 .../test/resources/ingestionJobSpecTemplate.yaml   | 45 ++++++++++++
 .../command/LaunchDataIngestionJobCommand.java     | 19 ++++-
 pom.xml                                            |  6 ++
 8 files changed, 283 insertions(+), 6 deletions(-)

diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index 8ad0aec..b1a4b8b 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -100,6 +100,10 @@
       <artifactId>jsr305</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.hubspot.jinjava</groupId>
+      <artifactId>jinjava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9bb740a..c3ae5ea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -20,13 +20,17 @@ package org.apache.pinot.spi.ingestion.batch;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.Reader;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -49,11 +53,19 @@ public class IngestionJobLauncher {
       System.exit(1);
     }
     String jobSpecFilePath = args[0];
+    List<String> valueList = Arrays.asList(args);
+    valueList.remove(0);
+    SegmentGenerationJobSpec spec =
+        getSegmentGenerationJobSpec(jobSpecFilePath, JinjaTemplateUtils.getTemplateContext(valueList));
+    runIngestionJob(spec);
+  }
 
-    try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
-      SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
-      runIngestionJob(spec);
-    }
+  public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath,
+      Map<String, Object> context)
+      throws IOException {
+    String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath)));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
   }
 
   public static void runIngestionJob(SegmentGenerationJobSpec spec)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
new file mode 100644
index 0000000..76be359
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.fasterxml.jackson.databind.util.StdDateFormat;
+import com.hubspot.jinjava.Jinjava;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JinjaTemplateUtils {
+
+  private static final Jinjava JINJAVA = new Jinjava();
+  private static final StdDateFormat STD_DATE_FORMAT = new StdDateFormat();
+
+  public static String renderTemplate(String template, Map<String, Object> newContext) {
+    Map<String, Object> contextMap = getDefaultJinjaContextMap();
+    contextMap.putAll(newContext);
+    return JINJAVA.render(template, contextMap);
+  }
+
+  public static Map<String, Object> getDefaultJinjaContextMap() {
+    Map<String, Object> defaultJinjaContextMap = new HashMap<>();
+    defaultJinjaContextMap.put("now", Instant.now());
+    defaultJinjaContextMap.put("today", STD_DATE_FORMAT.format(new Date(Instant.now().toEpochMilli())));
+    defaultJinjaContextMap
+        .put("yesterday", STD_DATE_FORMAT.format(new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli())));
+    return defaultJinjaContextMap;
+  }
+
+  public static Map<String, Object> getTemplateContext(List<String> values) {
+    Map<String, Object> context = new HashMap<>();
+    for (String value : values) {
+      String[] splits = value.split("=", 2);
+      if (splits.length > 1) {
+        context.put(splits[0], splits[1]);
+      }
+    }
+    return context;
+  }
+
+  public static String renderTemplate(String template) {
+    return renderTemplate(template, Collections.emptyMap());
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
new file mode 100644
index 0000000..7d54f0f
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.batch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IngestionJobLauncherTest {
+
+  @Test
+  public void testIngestionJobLauncherWithTemplate()
+      throws IOException {
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+        JinjaTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
new file mode 100644
index 0000000..8e13aed
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class JinjaTemplateUtilsTest {
+
+  @Test
+  public void testDefaultRenderTemplate() {
+    Date today = new Date(Instant.now().toEpochMilli());
+    Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli());
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ today }}").substring(0, 17), dateFormat.format(today));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ yesterday }}").substring(0, 17),
+        dateFormat.format(yesterday));
+  }
+
+  @Test
+  public void testRenderTemplateWithGivenContextMap() {
+    Map<String, Object> contextMap = new HashMap<>();
+    contextMap.put("first_date_2020", "2020-01-01");
+    contextMap.put("name", "xiang");
+    contextMap.put("ts", 1577836800);
+    contextMap.put("yyyy", "2020");
+    contextMap.put("MM", "05");
+    contextMap.put("dd", "06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ first_date_2020 }}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{first_date_2020}}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name }}", contextMap), "xiang");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name|upper }}", contextMap), "XIANG");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ ts }}", contextMap), "1577836800");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{ yyyy }}/{{ MM }}/{{ dd }}", contextMap),
+        "/var/rawdata/2020/05/06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{yyyy}}/{{MM}}/{{dd}}", contextMap),
+        "/var/rawdata/2020/05/06");
+  }
+
+  @Test
+  public void testIngestionJobTemplate()
+      throws IOException {
+    InputStream resourceAsStream =
+        JinjaTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml");
+    String yamlTemplate = IOUtils.toString(resourceAsStream);
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
new file mode 100644
index 0000000..5032e17
--- /dev/null
+++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input/{{ year }}/{{ month }}/{{ day }}'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output/{{year}}/{{month}}/{{day}}'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 49c31b2..3c491fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,11 +18,22 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
 
 /**
@@ -35,6 +46,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file")
   private String _jobSpecFile;
 
+  @Option(name = "-values", required = true, metaVar = "<list<string>>", handler = StringArrayOptionHandler.class, usage = "values set in the job spec template")
+  private List<String> _values;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -47,7 +61,10 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     try {
-      IngestionJobLauncher.main(new String[]{_jobSpecFile});
+      List<String> arguments = new ArrayList();
+      arguments.add(_jobSpecFile);
+      arguments.addAll(_values);
+      IngestionJobLauncher.main(arguments.toArray(new String[0]));
     } catch (Exception e) {
       LOGGER.error("Got exception to kick off standalone data ingestion job -", e);
       throw e;
diff --git a/pom.xml b/pom.xml
index d1494f3..846e803 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
     <scala.binary.version>2.11</scala.binary.version>
     <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
+    <jinjava.version>2.5.3</jinjava.version>
     <calcite.version>1.19.0</calcite.version>
     <lucene.version>8.2.0</lucene.version>
     <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -962,6 +963,11 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.hubspot.jinjava</groupId>
+        <artifactId>jinjava</artifactId>
+        <version>${jinjava.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-babel</artifactId>
         <version>${calcite.version}</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org