You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/05/06 10:49:32 UTC

[incubator-pinot] branch template_ingestion_spec updated (ecd43a9 -> 5815adb)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard ecd43a9  Adding template support for Pinot Ingestion Job Spec
     new 5815adb  Adding template support for Pinot Ingestion Job Spec

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ecd43a9)
            \
             N -- N -- N   refs/heads/template_ingestion_spec (5815adb)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pinot/spi/utils/JinjaTemplateUtils.java  | 21 +++++++++++++++------
 .../pinot/spi/utils/JinjaTemplateUtilsTest.java     |  8 +++-----
 2 files changed, 18 insertions(+), 11 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding template support for Pinot Ingestion Job Spec

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch template_ingestion_spec
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5815adb1617d7010d94b25b44688229dbcb455eb
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed May 6 03:38:59 2020 -0700

    Adding template support for Pinot Ingestion Job Spec
---
 pinot-spi/pom.xml                                  |  4 ++
 .../spi/ingestion/batch/IngestionJobLauncher.java  | 22 ++++--
 .../apache/pinot/spi/utils/JinjaTemplateUtils.java | 75 +++++++++++++++++++
 .../ingestion/batch/IngestionJobLauncherTest.java  | 42 +++++++++++
 .../pinot/spi/utils/JinjaTemplateUtilsTest.java    | 83 ++++++++++++++++++++++
 .../test/resources/ingestionJobSpecTemplate.yaml   | 45 ++++++++++++
 .../command/LaunchDataIngestionJobCommand.java     | 19 ++++-
 pom.xml                                            |  6 ++
 8 files changed, 290 insertions(+), 6 deletions(-)

diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index 8ad0aec..b1a4b8b 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -100,6 +100,10 @@
       <artifactId>jsr305</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.hubspot.jinjava</groupId>
+      <artifactId>jinjava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 9bb740a..c3ae5ea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -20,13 +20,17 @@ package org.apache.pinot.spi.ingestion.batch;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.Reader;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -49,11 +53,19 @@ public class IngestionJobLauncher {
       System.exit(1);
     }
     String jobSpecFilePath = args[0];
+    List<String> valueList = Arrays.asList(args);
+    valueList.remove(0);
+    SegmentGenerationJobSpec spec =
+        getSegmentGenerationJobSpec(jobSpecFilePath, JinjaTemplateUtils.getTemplateContext(valueList));
+    runIngestionJob(spec);
+  }
 
-    try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
-      SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
-      runIngestionJob(spec);
-    }
+  public static SegmentGenerationJobSpec getSegmentGenerationJobSpec(String jobSpecFilePath,
+      Map<String, Object> context)
+      throws IOException {
+    String yamlTemplate = IOUtils.toString(new BufferedReader(new FileReader(jobSpecFilePath)));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    return new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
   }
 
   public static void runIngestionJob(SegmentGenerationJobSpec spec)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
new file mode 100644
index 0000000..5d6e15b
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JinjaTemplateUtils.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.hubspot.jinjava.Jinjava;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+
+public class JinjaTemplateUtils {
+
+  private static final Jinjava JINJAVA = new Jinjava();
+  private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
+
+  public static String renderTemplate(String template, Map<String, Object> newContext) {
+    Map<String, Object> contextMap = getDefaultJinjaContextMap();
+    contextMap.putAll(newContext);
+    return JINJAVA.render(template, contextMap);
+  }
+
+  /**
+   Construct default template context:
+   today : today's date in format `yyyy-MM-dd`, example value: '2020-05-06'
+   yesterday : yesterday's date in format `yyyy-MM-dd`, example value: '2020-05-06'
+   */
+  public static Map<String, Object> getDefaultJinjaContextMap() {
+    Map<String, Object> defaultJinjaContextMap = new HashMap<>();
+    Instant now = Instant.now();
+    defaultJinjaContextMap.put("today", DATE_FORMAT.format(new Date(now.toEpochMilli())));
+    defaultJinjaContextMap.put("yesterday", DATE_FORMAT.format(new Date(now.minus(1, ChronoUnit.DAYS).toEpochMilli())));
+    return defaultJinjaContextMap;
+  }
+
+  public static Map<String, Object> getTemplateContext(List<String> values) {
+    Map<String, Object> context = new HashMap<>();
+    for (String value : values) {
+      String[] splits = value.split("=", 2);
+      if (splits.length > 1) {
+        context.put(splits[0], splits[1]);
+      }
+    }
+    return context;
+  }
+
+  public static String renderTemplate(String template) {
+    return renderTemplate(template, Collections.emptyMap());
+  }
+
+  static {
+    DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
new file mode 100644
index 0000000..7d54f0f
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.batch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IngestionJobLauncherTest {
+
+  @Test
+  public void testIngestionJobLauncherWithTemplate()
+      throws IOException {
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    SegmentGenerationJobSpec spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+        JinjaTemplateUtils.class.getClassLoader().getResource("ingestionJobSpecTemplate.yaml").getFile(), context);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
new file mode 100644
index 0000000..9948624
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JinjaTemplateUtilsTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class JinjaTemplateUtilsTest {
+
+  @Test
+  public void testDefaultRenderTemplate() {
+    Date today = new Date(Instant.now().toEpochMilli());
+    Date yesterday = new Date(Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli());
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ today }}"), dateFormat.format(today));
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ yesterday }}"), dateFormat.format(yesterday));
+  }
+
+  @Test
+  public void testRenderTemplateWithGivenContextMap() {
+    Map<String, Object> contextMap = new HashMap<>();
+    contextMap.put("first_date_2020", "2020-01-01");
+    contextMap.put("name", "xiang");
+    contextMap.put("ts", 1577836800);
+    contextMap.put("yyyy", "2020");
+    contextMap.put("MM", "05");
+    contextMap.put("dd", "06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ first_date_2020 }}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{first_date_2020}}", contextMap), "2020-01-01");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name }}", contextMap), "xiang");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ name|upper }}", contextMap), "XIANG");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("{{ ts }}", contextMap), "1577836800");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{ yyyy }}/{{ MM }}/{{ dd }}", contextMap),
+        "/var/rawdata/2020/05/06");
+    Assert.assertEquals(JinjaTemplateUtils.renderTemplate("/var/rawdata/{{yyyy}}/{{MM}}/{{dd}}", contextMap),
+        "/var/rawdata/2020/05/06");
+  }
+
+  @Test
+  public void testIngestionJobTemplate()
+      throws IOException {
+    InputStream resourceAsStream =
+        JinjaTemplateUtils.class.getClassLoader().getResourceAsStream("ingestionJobSpecTemplate.yaml");
+    String yamlTemplate = IOUtils.toString(resourceAsStream);
+    Map<String, Object> context =
+        JinjaTemplateUtils.getTemplateContext(Arrays.asList("year=2020", "month=05", "day=06"));
+    String yamlStr = JinjaTemplateUtils.renderTemplate(yamlTemplate, context);
+    SegmentGenerationJobSpec spec = new Yaml().loadAs(yamlStr, SegmentGenerationJobSpec.class);
+    Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06");
+    Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06");
+  }
+}
diff --git a/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
new file mode 100644
index 0000000..5032e17
--- /dev/null
+++ b/pinot-spi/src/test/resources/ingestionJobSpecTemplate.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input/{{ year }}/{{ month }}/{{ day }}'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output/{{year}}/{{month}}/{{day}}'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 49c31b2..3c491fc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,11 +18,22 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JinjaTemplateUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
 
 /**
@@ -35,6 +46,9 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   @Option(name = "-jobSpecFile", required = true, metaVar = "<string>", usage = "Ingestion job spec file")
   private String _jobSpecFile;
 
+  @Option(name = "-values", required = true, metaVar = "<list<string>>", handler = StringArrayOptionHandler.class, usage = "values set in the job spec template")
+  private List<String> _values;
+
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
@@ -47,7 +61,10 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     try {
-      IngestionJobLauncher.main(new String[]{_jobSpecFile});
+      List<String> arguments = new ArrayList();
+      arguments.add(_jobSpecFile);
+      arguments.addAll(_values);
+      IngestionJobLauncher.main(arguments.toArray(new String[0]));
     } catch (Exception e) {
       LOGGER.error("Got exception to kick off standalone data ingestion job -", e);
       throw e;
diff --git a/pom.xml b/pom.xml
index d1494f3..846e803 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
     <scala.binary.version>2.11</scala.binary.version>
     <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
+    <jinjava.version>2.5.3</jinjava.version>
     <calcite.version>1.19.0</calcite.version>
     <lucene.version>8.2.0</lucene.version>
     <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang -->
@@ -962,6 +963,11 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.hubspot.jinjava</groupId>
+        <artifactId>jinjava</artifactId>
+        <version>${jinjava.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-babel</artifactId>
         <version>${calcite.version}</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org