You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/10/26 08:36:19 UTC
[incubator-pinot] branch master updated: Adding bootstrap mode for
Pinot-hadoop job to output segments into relative directories. (#4742)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a8c6f8d Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories. (#4742)
a8c6f8d is described below
commit a8c6f8d3bb96ea9fe7b5336bdda7fafd127d90fa
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Oct 26 01:36:10 2019 -0700
Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories. (#4742)
* Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.
* Address comments
* Address comments
---
.../apache/pinot/hadoop/job/BaseSegmentJob.java | 4 ++
.../pinot/hadoop/job/JobConfigConstants.java | 3 +
.../hadoop/job/mappers/SegmentCreationMapper.java | 32 ++++++++-
.../hadoop/job/mappers/SegmentCreationTest.java | 75 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 2 deletions(-)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
index 702c91a..039c129 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -125,6 +125,10 @@ public abstract class BaseSegmentJob extends Configured {
if (fileStatus.isDirectory()) {
getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
} else {
+ // Skip temp files generated by computation frameworks like Hadoop/Spark.
+ if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+ continue;
+ }
if (isDataFile(path.getName())) {
tarFilePaths.add(path);
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 797b7bd..c3cf9de 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -59,4 +59,7 @@ public class JobConfigConstants {
// This setting should be used if you will generate less # of segments after
// push. In preprocessing, this is likely because we resize segments.
public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+
+ // This setting is used to match output segments hierarchy along with input file hierarchy.
+ public static final String USE_RELATIVE_PATH = "use.relative.path";
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 689d79d..f0f5456 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
@@ -66,6 +67,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
protected String _rawTableName;
protected Schema _schema;
protected SegmentNameGenerator _segmentNameGenerator;
+ protected boolean _useRelativePath = false;
// Optional
protected TableConfig _tableConfig;
@@ -81,12 +83,30 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
protected File _localSegmentDir;
protected File _localSegmentTarDir;
+ /**
+ * Generate a relative output directory path when `useRelativePath` flag is on.
+ * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+ * then apply only the directory part of relative path to `outputDir`.
+ * E.g.
+ * baseInputDir = "/path/to/input"
+ * inputFile = "/path/to/input/a/b/c/d.avro"
+ * outputDir = "/path/to/output"
+ * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+ */
+ protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+ URI relativePath = baseInputDir.relativize(inputFile);
+ Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+ "Unable to extract out the relative path based on base input path: " + baseInputDir);
+ return new Path(outputDir, relativePath.getPath()).getParent();
+ }
+
@Override
public void setup(Context context)
throws IOException, InterruptedException {
_jobConf = context.getConfiguration();
logConfigurations();
+ _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
_rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
_schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
@@ -195,7 +215,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
String inputFileName = hdfsInputFile.getName();
File localInputFile = new File(_localInputDir, inputFileName);
_logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
- FileSystem.get(hdfsInputFile.toUri(), _jobConf).copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+ FileSystem.get(hdfsInputFile.toUri(), _jobConf)
+ .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
segmentGeneratorConfig.setTableName(_rawTableName);
@@ -251,8 +272,15 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+ if (_useRelativePath) {
+ Path relativeOutputPath =
+ getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+ _hdfsSegmentTarDir);
+ hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+ }
_logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
- FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf).copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+ FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+ .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
_logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
diff --git a/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
new file mode 100644
index 0000000..75aa50b
--- /dev/null
+++ b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import java.net.URI;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCreationTest {
+
+ @Test
+ public void testBootstrapOutputPath() {
+ URI baseDir = new Path("/path/to/input").toUri();
+ URI inputFile = new Path("/path/to/input/a/b/c/d/e").toUri();
+ Path outputPath = new Path("/path/to/output");
+ Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+ Assert.assertEquals(relativeOutputPath.toString(), "/path/to/output/a/b/c/d");
+ }
+
+ @Test
+ public void testBootstrapOutputPath1() {
+ URI baseDir = new Path("/path/to/input/*/*.avro").toUri();
+ URI inputFile = new Path("/path/to/input/a/b.avro").toUri();
+ Path outputPath = new Path("/path/to/output");
+ Assert.assertThrows(() -> SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath));
+ }
+
+ @Test
+ public void testBootstrapOutputPathS3() {
+ URI baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+ URI inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/airlineStats_data.avro").toUri();
+ Path outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+ Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+ Assert.assertEquals(relativeOutputPath.toString(), "s3a://sample-s3-bucket/tmp/pinot/output");
+
+ baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+ inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+ outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+ relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+ Assert.assertEquals(relativeOutputPath.toString(), "s3a://sample-s3-bucket/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+ }
+
+ @Test
+ public void testBootstrapOutputPathHdfs() {
+ URI baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+ URI inputFile = new Path("hdfs://raw-data/tmp/pinot/input/airlineStats_data.avro").toUri();
+ Path outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+ Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+ Assert.assertEquals(relativeOutputPath.toString(), "hdfs://raw-data/tmp/pinot/output");
+
+ baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+ inputFile = new Path("hdfs://raw-data/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+ outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+ relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+ Assert.assertEquals(relativeOutputPath.toString(), "hdfs://raw-data/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org