You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/20 11:16:30 UTC
[incubator-pinot] 01/01: Default to use local directory sequence id
for segment name generation
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch default_local_seq_id
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 27426f5d1209f713e5d215ec08483cb3f17aabba
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Feb 20 03:15:24 2021 -0800
Default to use local directory sequence id for segment name generation
---
.../batch/common/SegmentGenerationJobUtils.java | 37 ++++++
.../hadoop/HadoopSegmentGenerationJobRunner.java | 45 +++++--
.../spark/SparkSegmentGenerationJobRunner.java | 10 +-
.../standalone/SegmentGenerationJobRunner.java | 144 +++++++++++++--------
4 files changed, 162 insertions(+), 74 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
new file mode 100644
index 0000000..c89d0bc
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.common;
+
+import java.io.Serializable;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+
+
+public class SegmentGenerationJobUtils implements Serializable {
+
+ /**
+ * Always use local directory sequence id unless explicitly configured.
+ *
+ */
+ public static boolean useLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
+ if (spec == null || spec.getConfigs() == null) {
+ return true;
+ }
+ return Boolean.parseBoolean(spec.getConfigs().get(SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID));
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 513ac24..ee866da 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -31,9 +31,11 @@ import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
@@ -50,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -218,15 +221,27 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
throw new RuntimeException(errorMessage);
} else {
LOGGER.info("Creating segments with data files: {}", filteredFiles);
- for (int i = 0; i < numDataFiles; i++) {
- // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
- // hdfs:// portion of the path. Call getFileURI() to fix this up.
- URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
- File localFile = File.createTempFile("pinot-filepath-", ".txt");
- try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
- dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + i));
- dataOutputStream.flush();
- outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(i)).toUri());
+ if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+ Map<String, List<String>> localDirIndex = new HashMap<>();
+ for (String filteredFile : filteredFiles) {
+ java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+ if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+ localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
+ }
+ localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+ }
+ for (String parentPath : localDirIndex.keySet()) {
+ List<String> siblingFiles = localDirIndex.get(parentPath);
+ Collections.sort(siblingFiles);
+ for (int i = 0; i < siblingFiles.size(); i++) {
+ URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+ createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
+ }
+ }
+ } else {
+ for (int i = 0; i < numDataFiles; i++) {
+ URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+ createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i);
}
}
}
@@ -308,6 +323,18 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
}
}
+ private void createInputFileUriAndSeqIdFile(URI inputFileURI, PinotFS outputDirFS, Path stagingInputDir, int seqId)
+ throws Exception {
+ // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+ // hdfs:// portion of the path. Call getFileURI() to fix this up.
+ File localFile = File.createTempFile("pinot-filepath-", ".txt");
+ try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(localFile))) {
+ dataOutputStream.write(StringUtil.encodeUtf8(inputFileURI + " " + seqId));
+ dataOutputStream.flush();
+ outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, Integer.toString(seqId)).toUri());
+ }
+ }
+
/**
* Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
* If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index a315d15..4bb97a1 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -43,6 +43,7 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -210,7 +211,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
List<String> pathAndIdxList = new ArrayList<>();
- if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+ if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
Map<String, List<String>> localDirIndex = new HashMap<>();
for (String filteredFile : filteredFiles) {
Path filteredParentPath = Paths.get(filteredFile).getParent();
@@ -351,13 +352,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
}
- private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
- if (spec == null || spec.getConfigs() == null) {
- return false;
- }
- return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID));
- }
-
protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir)
throws IOException {
if (depsJarDir != null) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index efe1679..7f15939 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -25,16 +25,19 @@ import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -170,62 +173,39 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
int numInputFiles = filteredFiles.size();
CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
- //iterate on the file list, for each
- for (int i = 0; i < numInputFiles; i++) {
- final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-
- //copy input path to local
- File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
- inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
-
- //create task spec
- SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
- taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
- taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
- taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
- taskSpec.setSchema(schema);
- taskSpec.setTableConfig(tableConfig);
- taskSpec.setSequenceId(i);
- taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
- taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
-
- LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
- _executorService.submit(() -> {
- File localSegmentDir = null;
- File localSegmentTarFile = null;
- try {
- //invoke segmentGenerationTask
- SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
- String segmentName = taskRunner.run();
- // Tar segment directory to compress file
- localSegmentDir = new File(localOutputTempDir, segmentName);
- String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
- localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
- LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
- long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
- long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
- LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
- DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
- //move segment to output PinotFS
- URI outputSegmentTarURI =
- SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
- .resolve(segmentTarFileName);
- if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
- LOGGER
- .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
- } else {
- outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
- } finally {
- segmentCreationTaskCountDownLatch.countDown();
- FileUtils.deleteQuietly(localSegmentDir);
- FileUtils.deleteQuietly(localSegmentTarFile);
- FileUtils.deleteQuietly(localInputDataFile);
+ //create task spec
+ SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+ taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+ taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+ taskSpec.setSchema(schema);
+ taskSpec.setTableConfig(tableConfig);
+ taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+
+ if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+ Map<String, List<String>> localDirIndex = new HashMap<>();
+ for (String filteredFile : filteredFiles) {
+ java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+ if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+ localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
}
- });
+ localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+ }
+ for (String parentPath : localDirIndex.keySet()) {
+ List<String> siblingFiles = localDirIndex.get(parentPath);
+ Collections.sort(siblingFiles);
+ for (int i = 0; i < siblingFiles.size(); i++) {
+ URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), URI.create(parentPath));
+ submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+ localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+ }
+ }
+ } else {
+ //iterate on the file list, for each
+ for (int i = 0; i < numInputFiles; i++) {
+ final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
+ submitSegmentGenTask(inputDirFS, outputDirFS, inputDirURI, outputDirURI, localInputTempDir,
+ localOutputTempDir, taskSpec, inputFileURI, i, segmentCreationTaskCountDownLatch);
+ }
}
segmentCreationTaskCountDownLatch.await();
} finally {
@@ -234,4 +214,54 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
_executorService.shutdown();
}
}
+
+ private void submitSegmentGenTask(PinotFS inputDirFS, PinotFS outputDirFS, URI inputDirURI, URI outputDirURI,
+ File localInputTempDir, File localOutputTempDir, SegmentGenerationTaskSpec taskSpec, URI inputFileURI, int seqId,
+ CountDownLatch segmentCreationTaskCountDownLatch)
+ throws Exception {
+ //copy input path to local
+ File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+ inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
+
+ //Update task spec
+ taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+ taskSpec.setSequenceId(seqId);
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+
+ LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+ _executorService.submit(() -> {
+ File localSegmentDir = null;
+ File localSegmentTarFile = null;
+ try {
+ //invoke segmentGenerationTask
+ SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+ String segmentName = taskRunner.run();
+ // Tar segment directory to compress file
+ localSegmentDir = new File(localOutputTempDir, segmentName);
+ String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
+ localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+ //move segment to output PinotFS
+ URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+ .resolve(segmentTarFileName);
+ if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+ LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+ } else {
+ outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+ } finally {
+ segmentCreationTaskCountDownLatch.countDown();
+ FileUtils.deleteQuietly(localSegmentDir);
+ FileUtils.deleteQuietly(localSegmentTarFile);
+ FileUtils.deleteQuietly(localInputDataFile);
+ }
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org