You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/20 11:16:29 UTC

[incubator-pinot] branch default_local_seq_id created (now 27426f5)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch default_local_seq_id
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 27426f5  Default to use local directory sequence id for segment name generation

This branch includes the following new commits:

     new 27426f5  Default to use local directory sequence id for segment name generation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Default to use local directory sequence id for segment name generation

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch default_local_seq_id
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 27426f5d1209f713e5d215ec08483cb3f17aabba
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Feb 20 03:15:24 2021 -0800

    Default to use local directory sequence id for segment name generation
---
 .../batch/common/SegmentGenerationJobUtils.java    |  37 ++++++
 .../hadoop/HadoopSegmentGenerationJobRunner.java   |  45 +++++--
 .../spark/SparkSegmentGenerationJobRunner.java     |  10 +-
 .../standalone/SegmentGenerationJobRunner.java     | 144 +++++++++++++--------
 4 files changed, 162 insertions(+), 74 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
new file mode 100644
index 0000000..c89d0bc
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.common;
+
+import java.io.Serializable;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+
+
+public class SegmentGenerationJobUtils implements Serializable {
+
+  /**
+   * Always use local directory sequence id unless explicitly configured.
+   *
+   */
+  public static boolean useLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
+    if (spec == null || spec.getConfigs() == null) {
+      return true;
+    }
+    return Boolean.parseBoolean(spec.getConfigs().get(SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 513ac24..ee866da 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -31,9 +31,11 @@ import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -218,15 +221,27 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       throw new RuntimeException(errorMessage);
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
-      for (int i = 0; i < numDataFiles; i++) {
-        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
-        // hdfs:// portion of the path. Call getFileURI() to fix this up.
-        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-        File localFile = File.createTempFile("pinot-filepath-", ".txt");
-        try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
-          dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + i));
-          dataOutputStream.flush();
-          outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(i)).toUri());
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
+          }
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+        }
+        for (String parentPath : localDirIndex.keySet()) {
+          List<String> siblingFiles = localDirIndex.get(parentPath);
+          Collections.sort(siblingFiles);
+          for (int i = 0; i < siblingFiles.size(); i++) {
+            URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+            createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
+          }
+        }
+      } else {
+        for (int i = 0; i < numDataFiles; i++) {
+          URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+          createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
         }
       }
     }
@@ -308,6 +323,18 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
     }
   }
 
+  private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirFS, Path stagingInputDir, int seqId)
+      throws Exception {
+    // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+    // hdfs:// portion of the path. Call getFileURI() to fix this up.
+    File localFile = File.createTempFile("pinot-filepath-", ".txt");
+    try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
+      dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + seqId));
+      dataOutputStream.flush();
+      outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(seqId)).toUri());
+    }
+  }
+
   /**
    * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
    * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..4bb97a1 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -43,6 +43,7 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -210,7 +211,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
       }
 
       List<String> pathAndIdxList = new ArrayList<>();
-      if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
         Map<String, List<String>> localDirIndex = new HashMap<>();
         for (String filteredFile : filteredFiles) {
           Path filteredParentPath = Paths.get(filteredFile).getParent();
@@ -351,13 +352,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
     }
   }
 
-  private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
-    if (spec == null || spec.getConfigs() == null) {
-      return false;
-    }
-    return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID));
-  }
-
   protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir)
       throws IOException {
     if (depsJarDir != null) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..7f15939 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -25,16 +25,19 @@ import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -170,62 +173,39 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
 
       int numInputFiles = filteredFiles.size();
       CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
-      //iterate on the file list, for each
-      for (int i = 0; i < numInputFiles; i++) {
-        final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-
-        //copy input path to local
-        File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
-        inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
-
-        //create task spec
-        SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
-        taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
-        taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
-        taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        taskSpec.setSchema(schema);
-        taskSpec.setTableConfig(tableConfig);
-        taskSpec.setSequenceId(i);
-        taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
-        taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
-
-        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
-        _executorService.submit(() -> {
-          File localSegmentDir = null;
-          File localSegmentTarFile = null;
-          try {
-            //invoke segmentGenerationTask
-            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-            String segmentName = taskRunner.run();
-            // Tar segment directory to compress file
-            localSegmentDir = new File(localOutputTempDir, segmentName);
-            String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
-            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-            //move segment to output PinotFS
-            URI outputSegmentTarURI =
-                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-                    .resolve(segmentTarFileName);
-            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-              LOGGER
-                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-            } else {
-              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-            }
-          } catch (Exception e) {
-            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
-          } finally {
-            segmentCreationTaskCountDownLatch.countDown();
-            FileUtils.deleteQuietly(localSegmentDir);
-            FileUtils.deleteQuietly(localSegmentTarFile);
-            FileUtils.deleteQuietly(localInputDataFile);
+      //create task spec
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+      taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+      taskSpec.setSchema(schema);
+      taskSpec.setTableConfig(tableConfig);
+      taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
           }
-        });
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+        }
+        for (String parentPath : localDirIndex.keySet()) {
+          List<String> siblingFiles = localDirIndex.get(parentPath);
+          Collections.sort(siblingFiles);
+          for (int i = 0; i < siblingFiles.size(); i++) {
+            URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+            submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+                localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+          }
+        }
+      } else {
+        //iterate on the file list, for each
+        for (int i = 0; i < numInputFiles; i++) {
+          final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+          submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+              localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+        }
       }
       segmentCreationTaskCountDownLatch.await();
     } finally {
@@ -234,4 +214,54 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       _executorService.shutdown();
     }
   }
+
+  private void submitSegmentGenTask(PinotFS inputDirFS, PinotFS outputDirFS, URI inputDirURI, URI outputDirURI,
+      File localInputTempDir, File localOutputTempDir, SegmentGenerationTaskSpec taskSpec, URI inputFileURI, int seqId,
+      CountDownLatch segmentCreationTaskCountDownLatch)
+      throws Exception {
+    //copy input path to local
+    File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+    inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
+
+    //Update task spec
+    taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+    taskSpec.setSequenceId(seqId);
+    taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+
+    LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+    _executorService.submit(() -> {
+      File localSegmentDir = null;
+      File localSegmentTarFile = null;
+      try {
+        //invoke segmentGenerationTask
+        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+        String segmentName = taskRunner.run();
+        // Tar segment directory to compress file
+        localSegmentDir = new File(localOutputTempDir, segmentName);
+        String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
+        localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+        //move segment to output PinotFS
+        URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+            .resolve(segmentTarFileName);
+        if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+          LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+        } else {
+          outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+      } finally {
+        segmentCreationTaskCountDownLatch.countDown();
+        FileUtils.deleteQuietly(localSegmentDir);
+        FileUtils.deleteQuietly(localSegmentTarFile);
+        FileUtils.deleteQuietly(localInputDataFile);
+      }
+    });
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org