You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/10/26 08:36:19 UTC

[incubator-pinot] branch master updated: Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories. (#4742)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a8c6f8d  Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories. (#4742)
a8c6f8d is described below

commit a8c6f8d3bb96ea9fe7b5336bdda7fafd127d90fa
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Oct 26 01:36:10 2019 -0700

    Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories. (#4742)
    
    * Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.
    
    * Address comments
    
    * Address comments
---
 .../apache/pinot/hadoop/job/BaseSegmentJob.java    |  4 ++
 .../pinot/hadoop/job/JobConfigConstants.java       |  3 +
 .../hadoop/job/mappers/SegmentCreationMapper.java  | 32 ++++++++-
 .../hadoop/job/mappers/SegmentCreationTest.java    | 75 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
index 702c91a..039c129 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -125,6 +125,10 @@ public abstract class BaseSegmentJob extends Configured {
       if (fileStatus.isDirectory()) {
         getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
       } else {
+        // Skip temp files generated by computation frameworks like Hadoop/Spark.
+        if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+          continue;
+        }
         if (isDataFile(path.getName())) {
           tarFilePaths.add(path);
         }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 797b7bd..c3cf9de 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -59,4 +59,7 @@ public class JobConfigConstants {
   // This setting should be used if you will generate less # of segments after
   // push. In preprocessing, this is likely because we resize segments.
   public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+
+  // This setting is used to match output segments hierarchy along with input file hierarchy.
+  public static final String USE_RELATIVE_PATH = "use.relative.path";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 689d79d..f0f5456 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
@@ -66,6 +67,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
   protected String _rawTableName;
   protected Schema _schema;
   protected SegmentNameGenerator _segmentNameGenerator;
+  protected boolean _useRelativePath = false;
 
   // Optional
   protected TableConfig _tableConfig;
@@ -81,12 +83,30 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
   protected File _localSegmentDir;
   protected File _localSegmentTarDir;
 
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is on.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    return new Path(outputDir, relativePath.getPath()).getParent();
+  }
+
   @Override
   public void setup(Context context)
       throws IOException, InterruptedException {
     _jobConf = context.getConfiguration();
     logConfigurations();
 
+    _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
     _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
     _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
 
@@ -195,7 +215,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     String inputFileName = hdfsInputFile.getName();
     File localInputFile = new File(_localInputDir, inputFileName);
     _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile);
-    FileSystem.get(hdfsInputFile.toUri(), _jobConf).copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
+    FileSystem.get(hdfsInputFile.toUri(), _jobConf)
+        .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath()));
 
     SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
     segmentGeneratorConfig.setTableName(_rawTableName);
@@ -251,8 +272,15 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
         DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
 
     Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+    if (_useRelativePath) {
+      Path relativeOutputPath =
+          getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
+              _hdfsSegmentTarDir);
+      hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
+    }
     _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
-    FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf).copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
+    FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
+        .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
 
     context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
     _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
diff --git a/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
new file mode 100644
index 0000000..75aa50b
--- /dev/null
+++ b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import java.net.URI;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCreationTest {
+
+  @Test
+  public void testBootstrapOutputPath() {
+    URI baseDir = new Path("/path/to/input").toUri();
+    URI inputFile = new Path("/path/to/input/a/b/c/d/e").toUri();
+    Path outputPath = new Path("/path/to/output");
+    Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(relativeOutputPath.toString(), "/path/to/output/a/b/c/d");
+  }
+
+  @Test
+  public void testBootstrapOutputPath1() {
+    URI baseDir = new Path("/path/to/input/*/*.avro").toUri();
+    URI inputFile = new Path("/path/to/input/a/b.avro").toUri();
+    Path outputPath = new Path("/path/to/output");
+    Assert.assertThrows(() -> SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath));
+  }
+
+  @Test
+  public void testBootstrapOutputPathS3() {
+    URI baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+    URI inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/airlineStats_data.avro").toUri();
+    Path outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+    Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(relativeOutputPath.toString(), "s3a://sample-s3-bucket/tmp/pinot/output");
+
+    baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+    inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+    outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+    relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(relativeOutputPath.toString(), "s3a://sample-s3-bucket/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+  }
+
+  @Test
+  public void testBootstrapOutputPathHdfs() {
+    URI baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+    URI inputFile = new Path("hdfs://raw-data/tmp/pinot/input/airlineStats_data.avro").toUri();
+    Path outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+    Path relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(relativeOutputPath.toString(), "hdfs://raw-data/tmp/pinot/output");
+
+    baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+    inputFile = new Path("hdfs://raw-data/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+    outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+    relativeOutputPath = SegmentCreationMapper.getRelativeOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(relativeOutputPath.toString(), "hdfs://raw-data/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org