You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/06 10:39:39 UTC

[incubator-pinot] branch template_ingestion_spec created (now ecd43a9)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at ecd43a9  Adding template support for Pinot Ingestion Job Spec

This branch includes the following new commits:

     new ecd43a9  Adding template support for Pinot Ingestion Job Spec

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding template support for Pinot Ingestion Job Spec

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ecd43a9e30720e7c9f67aee4243fdb619a55b42d
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 6 03:38:59 2020 -0700

    Adding template support for Pinot Ingestion Job Spec
---
 pinot-spi/pom.xml                                  |  4 +
 .../spi/ingestion/batch/IngestionJobLauncher.java  | 22 ++++--
 .../apache/pinot/spi/utils/JinjaTemplateUtils.java | 66 +++++++++++++++++
 .../ingestion/batch/IngestionJobLauncherTest.java  | 42 +++++++++++
 .../pinot/spi/utils/JinjaTemplateUtilsTest.java    | 85 ++++++++++++++++++++++
 .../test/resources/ingestionJobSpecTemplate.yaml   | 45 ++++++++++++
 .../command/LaunchDataIngestionJobCommand.java     | 19 ++++-
 pom.xml                                            |  6 ++
 8 files changed, 283 insertions(+), 6 deletions(-)

diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index 8ad0aec..b1a4b8b 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -100,6 +100,10 @@
       <artifactId>jsr305</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.hubspot.jinjava</groupId>
+      <artifactId>jinjava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9bb740a..c3ae5ea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -20,13 +20,17 @@ package org.apache.pinot.spi.ingestion.batch;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.Reader;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -49,11 +53,19 @@ public class IngestionJobLauncher {
       System.exit(1);
     }
     String jobSpecFilePath = args[0];
+    List<String> valueList = Arrays.asList(args);
+    valueList.remove(0);
+    SegmentGenerationJobSpec spec =
+        getSegmentGenerationJobSpec(jobSpecFilePath, JinjaTemplateUtils.getTemplateContext(valueList));
+    runIngestionJob(spec);
+  }
 
-    try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
-      SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
-      runIngestionJob(spec);
-    }
+  public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath,
+      Map<String, Object> context)
+      throws IOException {
+    String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath)));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
   }
 
   public static void runIngestionJob(SegmentGenerationJobSpec spec)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
new file mode 100644
index 0000000..76be359
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.fasterxml.jackson.databind.util.StdDateFormat;
+import com.hubspot.jinjava.Jinjava;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class JinjaTemplateUtils {
+
+  private static final Jinjava JINJAVA = new Jinjava();
+  private static final StdDateFormat STD_DATE_FORMAT = new StdDateFormat();
+
+  public static String renderTemplate(String template, Map<String, Object> newContext) {
+    Map<String, Object> contextMap = getDefaultJinjaContextMap();
+    contextMap.putAll(newContext);
+    return JINJAVA.render(template, contextMap);
+  }
+
+  public static Map<String, Object> getDefaultJinjaContextMap() {
+    Map<String, Object> defaultJinjaContextMap = new HashMap<>();
+    defaultJinjaContextMap.put("now", Instant.now());
+    defaultJinjaContextMap.put("today", STD_DATE_FORMAT.format(new Date(Instant.now().toEpochMilli())));
+    defaultJinjaContextMap
+        .put("yesterday", STD_DATE_FORMAT.format(new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli())));
+    return defaultJinjaContextMap;
+  }
+
+  public static Map<String, Object> getTemplateContext(List<String> values) {
+    Map<String, Object> context = new HashMap<>();
+    for (String value : values) {
+      String[] splits = value.split("=", 2);
+      if (splits.length > 1) {
+        context.put(splits[0], splits[1]);
+      }
+    }
+    return context;
+  }
+
+  public static String renderTemplate(String template) {
+    return renderTemplate(template, Collections.emptyMap());
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
new file mode 100644
index 0000000..7d54f0f
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.batch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IngestionJobLauncherTest {
+
+  @Test
+  public void testIngestionJobLauncherWithTemplate()
+      throws IOException {
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+        JinjaTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
new file mode 100644
index 0000000..8e13aed
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class JinjaTemplateUtilsTest {
+
+  @Test
+  public void testDefaultRenderTemplate() {
+    Date today = new Date(Instant.now().toEpochMilli());
+    Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli());
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ today }}").substring(0, 17), dateFormat.format(today));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ yesterday }}").substring(0, 17),
+        dateFormat.format(yesterday));
+  }
+
+  @Test
+  public void testRenderTemplateWithGivenContextMap() {
+    Map<String, Object> contextMap = new HashMap<>();
+    contextMap.put("first_date_2020", "2020-01-01");
+    contextMap.put("name", "xiang");
+    contextMap.put("ts", 1577836800);
+    contextMap.put("yyyy", "2020");
+    contextMap.put("MM", "05");
+    contextMap.put("dd", "06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ first_date_2020 }}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{first_date_2020}}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name }}", contextMap), "xiang");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name|upper }}", contextMap), "XIANG");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ ts }}", contextMap), "1577836800");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{ yyyy }}/{{ MM }}/{{ dd }}", contextMap),
+        "/var/rawdata/2020/05/06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{yyyy}}/{{MM}}/{{dd}}", contextMap),
+        "/var/rawdata/2020/05/06");
+  }
+
+  @Test
+  public void testIngestionJobTemplate()
+      throws IOException {
+    InputStream resourceAsStream =
+        JinjaTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml");
+    String yamlTemplate = IOUtils.toString(resourceAsStream);
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
new file mode 100644
index 0000000..5032e17
--- /dev/null
+++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input/{{ year }}/{{ month }}/{{ day }}'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output/{{year}}/{{month}}/{{day}}'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 49c31b2..3c491fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,11 +18,22 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
 
 /**
@@ -35,6 +46,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file")
   private String _jobSpecFile;
 
+  @Option(name = "-values", required = true, metaVar = "<list<string>>", handler = StringArrayOptionHandler.class, usage = "values set in the job spec template")
+  private List<String> _values;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -47,7 +61,10 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     try {
-      IngestionJobLauncher.main(new String[]{_jobSpecFile});
+      List<String> arguments = new ArrayList();
+      arguments.add(_jobSpecFile);
+      arguments.addAll(_values);
+      IngestionJobLauncher.main(arguments.toArray(new String[0]));
     } catch (Exception e) {
       LOGGER.error("Got exception to kick off standalone data ingestion job -", e);
       throw e;
diff --git a/pom.xml b/pom.xml
index d1494f3..846e803 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
     <scala.binary.version>2.11</scala.binary.version>
     <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
+    <jinjava.version>2.5.3</jinjava.version>
     <calcite.version>1.19.0</calcite.version>
     <lucene.version>8.2.0</lucene.version>
     <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -962,6 +963,11 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.hubspot.jinjava</groupId>
+        <artifactId>jinjava</artifactId>
+        <version>${jinjava.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-babel</artifactId>
         <version>${calcite.version}</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org