You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/20 11:16:30 UTC

[incubator-pinot] 01/01: Default to use local directory sequence id for segment name generation

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch default_local_seq_id
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 27426f5d1209f713e5d215ec08483cb3f17aabba
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Feb 20 03:15:24 2021 -0800

    Default to use local directory sequence id for segment name generation
---
 .../batch/common/SegmentGenerationJobUtils.java    |  37 ++++++
 .../hadoop/HadoopSegmentGenerationJobRunner.java   |  45 +++++--
 .../spark/SparkSegmentGenerationJobRunner.java     |  10 +-
 .../standalone/SegmentGenerationJobRunner.java     | 144 +++++++++++++--------
 4 files changed, 162 insertions(+), 74 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
new file mode 100644
index 0000000..c89d0bc
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.common;
+
+import java.io.Serializable;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+
+
+public class SegmentGenerationJobUtils implements Serializable {
+
+  /**
+   * Always use local directory sequence id unless explicitly configured.
+   *
+   */
+  public static boolean useLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
+    if (spec == null || spec.getConfigs() == null) {
+      return true;
+    }
+    return Boolean.parseBoolean(spec.getConfigs().get(SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 513ac24..ee866da 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -31,9 +31,11 @@ import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -218,15 +221,27 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       throw new RuntimeException(errorMessage);
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
-      for (int i = 0; i < numDataFiles; i++) {
-        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
-        // hdfs:// portion of the path. Call getFileURI() to fix this up.
-        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-        File localFile = File.createTempFile("pinot-filepath-", ".txt");
-        try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
-          dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + i));
-          dataOutputStream.flush();
-          outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(i)).toUri());
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
+          }
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+        }
+        for (String parentPath : localDirIndex.keySet()) {
+          List<String> siblingFiles = localDirIndex.get(parentPath);
+          Collections.sort(siblingFiles);
+          for (int i = 0; i < siblingFiles.size(); i++) {
+            URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+            createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
+          }
+        }
+      } else {
+        for (int i = 0; i < numDataFiles; i++) {
+          URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+          createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
         }
       }
     }
@@ -308,6 +323,18 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
     }
   }
 
+  private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirFS, Path stagingInputDir, int seqId)
+      throws Exception {
+    // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+    // hdfs:// portion of the path. Call getFileURI() to fix this up.
+    File localFile = File.createTempFile("pinot-filepath-", ".txt");
+    try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
+      dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + seqId));
+      dataOutputStream.flush();
+      outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(seqId)).toUri());
+    }
+  }
+
   /**
    * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
    * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..4bb97a1 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -43,6 +43,7 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -210,7 +211,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
       }
 
       List<String> pathAndIdxList = new ArrayList<>();
-      if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
         Map<String, List<String>> localDirIndex = new HashMap<>();
         for (String filteredFile : filteredFiles) {
           Path filteredParentPath = Paths.get(filteredFile).getParent();
@@ -351,13 +352,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
     }
   }
 
-  private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
-    if (spec == null || spec.getConfigs() == null) {
-      return false;
-    }
-    return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID));
-  }
-
   protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir)
       throws IOException {
     if (depsJarDir != null) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..7f15939 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -25,16 +25,19 @@ import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -170,62 +173,39 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
 
       int numInputFiles = filteredFiles.size();
       CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
-      //iterate on the file list, for each
-      for (int i = 0; i < numInputFiles; i++) {
-        final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-
-        //copy input path to local
-        File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
-        inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
-
-        //create task spec
-        SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
-        taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
-        taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
-        taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        taskSpec.setSchema(schema);
-        taskSpec.setTableConfig(tableConfig);
-        taskSpec.setSequenceId(i);
-        taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
-        taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
-
-        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
-        _executorService.submit(() -> {
-          File localSegmentDir = null;
-          File localSegmentTarFile = null;
-          try {
-            //invoke segmentGenerationTask
-            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-            String segmentName = taskRunner.run();
-            // Tar segment directory to compress file
-            localSegmentDir = new File(localOutputTempDir, segmentName);
-            String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
-            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-            //move segment to output PinotFS
-            URI outputSegmentTarURI =
-                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-                    .resolve(segmentTarFileName);
-            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-              LOGGER
-                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-            } else {
-              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-            }
-          } catch (Exception e) {
-            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
-          } finally {
-            segmentCreationTaskCountDownLatch.countDown();
-            FileUtils.deleteQuietly(localSegmentDir);
-            FileUtils.deleteQuietly(localSegmentTarFile);
-            FileUtils.deleteQuietly(localInputDataFile);
+      //create task spec
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+      taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+      taskSpec.setSchema(schema);
+      taskSpec.setTableConfig(tableConfig);
+      taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
           }
-        });
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+        }
+        for (String parentPath : localDirIndex.keySet()) {
+          List<String> siblingFiles = localDirIndex.get(parentPath);
+          Collections.sort(siblingFiles);
+          for (int i = 0; i < siblingFiles.size(); i++) {
+            URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+            submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+                localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+          }
+        }
+      } else {
+        //iterate on the file list, for each
+        for (int i = 0; i < numInputFiles; i++) {
+          final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+          submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+              localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+        }
       }
       segmentCreationTaskCountDownLatch.await();
     } finally {
@@ -234,4 +214,54 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       _executorService.shutdown();
     }
   }
+
+  private void submitSegmentGenTask(PinotFS inputDirFS, PinotFS outputDirFS, URI inputDirURI, URI outputDirURI,
+      File localInputTempDir, File localOutputTempDir, SegmentGenerationTaskSpec taskSpec, URI inputFileURI, int seqId,
+      CountDownLatch segmentCreationTaskCountDownLatch)
+      throws Exception {
+    //copy input path to local
+    File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+    inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
+
+    //Update task spec
+    taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+    taskSpec.setSequenceId(seqId);
+    taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+
+    LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+    _executorService.submit(() -> {
+      File localSegmentDir = null;
+      File localSegmentTarFile = null;
+      try {
+        //invoke segmentGenerationTask
+        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+        String segmentName = taskRunner.run();
+        // Tar segment directory to compress file
+        localSegmentDir = new File(localOutputTempDir, segmentName);
+        String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
+        localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+        //move segment to output PinotFS
+        URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+            .resolve(segmentTarFileName);
+        if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+          LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+        } else {
+          outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+      } finally {
+        segmentCreationTaskCountDownLatch.countDown();
+        FileUtils.deleteQuietly(localSegmentDir);
+        FileUtils.deleteQuietly(localSegmentTarFile);
+        FileUtils.deleteQuietly(localInputDataFile);
+      }
+    });
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org