You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/10/24 06:37:21 UTC

[incubator-pinot] branch hadoop-bootstrap-mode created (now bdb5a94)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch hadoop-bootstrap-mode
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at bdb5a94  Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.

This branch includes the following new commits:

     new bdb5a94  Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch hadoop-bootstrap-mode
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit bdb5a94b030874fdaee0438c59a3493885b4e277
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Oct 23 23:33:20 2019 -0700

    Adding bootstrap mode for Pinot-hadoop job to output segments into relative directories.
---
 .../apache/pinot/hadoop/job/BaseSegmentJob.java    |  4 ++
 .../pinot/hadoop/job/JobConfigConstants.java       |  1 +
 .../hadoop/job/mappers/SegmentCreationMapper.java  | 30 ++++++++++++
 .../hadoop/job/mappers/SegmentCreationTest.java    | 54 ++++++++++++++++++++++
 4 files changed, 89 insertions(+)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
index 702c91a..039c129 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -125,6 +125,10 @@ public abstract class BaseSegmentJob extends Configured {
       if (fileStatus.isDirectory()) {
         getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths);
       } else {
+        // Skip temp files generated by computation frameworks like Hadoop/Spark.
+        if (path.getName().startsWith("_") || path.getName().startsWith(".")) {
+          continue;
+        }
         if (isDataFile(path.getName())) {
           tarFilePaths.add(path);
         }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 797b7bd..6c43268 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -59,4 +59,5 @@ public class JobConfigConstants {
   // This setting should be used if you will generate less # of segments after
   // push. In preprocessing, this is likely because we resize segments.
   public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments";
+  public static final String BOOTSTRAP_JOB = "job.bootstrap";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 689d79d..1f4dd26 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -66,6 +68,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
   protected String _rawTableName;
   protected Schema _schema;
   protected SegmentNameGenerator _segmentNameGenerator;
+  protected boolean _bootstrapJob = false;
 
   // Optional
   protected TableConfig _tableConfig;
@@ -87,6 +90,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     _jobConf = context.getConfiguration();
     logConfigurations();
 
+    _bootstrapJob = _jobConf.getBoolean(JobConfigConstants.BOOTSTRAP_JOB, false);
     _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
     _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA));
 
@@ -251,6 +255,14 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
         DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize));
 
     Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
+    if (_bootstrapJob) {
+      Path bootstrapOutputPath =
+          getBootstrapOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(),
+              hdfsInputFile.toUri(), _hdfsSegmentTarDir);
+      if (bootstrapOutputPath != null) {
+        hdfsSegmentTarFile = new Path(bootstrapOutputPath, segmentTarFileName);
+      }
+    }
     _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
     FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf).copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);
 
@@ -259,6 +271,24 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
         sequenceId);
   }
 
+  /**
+   * Generate an output directory path for bootstrap mode.
+   * This method will compute the relative path based on `inputFile` and `baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getBootstrapOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c
+   */
+  protected static Path getBootstrapOutputPath(URI baseInputDir, URI inputFile, Path outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    if (relativePath.getPath().length() > 0) {
+      return new Path(outputDir, relativePath.getPath()).getParent();
+    }
+    return null;
+  }
+
   protected FileFormat getFileFormat(String fileName) {
     if (fileName.endsWith(".avro")) {
       return FileFormat.AVRO;
diff --git a/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
new file mode 100644
index 0000000..cd5b55c
--- /dev/null
+++ b/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationTest.java
@@ -0,0 +1,54 @@
+package org.apache.pinot.hadoop.job.mappers;
+
+import java.net.URI;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCreationTest {
+
+  @Test
+  public void testBootstrapOutputPath() {
+    URI baseDir = new Path("/path/to/input").toUri();
+    URI inputFile = new Path("/path/to/input/a/b/c/d/e").toUri();
+    Path outputPath = new Path("/path/to/output");
+    Path bootstrapOutputPath = SegmentCreationMapper.getBootstrapOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(bootstrapOutputPath.toString(), "/path/to/output/a/b/c/d");
+  }
+
+  @Test
+  public void testBootstrapOutputPathS3() {
+    URI baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+    URI inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/airlineStats_data.avro").toUri();
+    Path outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+    Path bootstrapOutputPath = SegmentCreationMapper.getBootstrapOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(bootstrapOutputPath.toString(),
+        "s3a://sample-s3-bucket/tmp/pinot/output");
+
+    baseDir = new Path("s3a://sample-s3-bucket/tmp/pinot/input").toUri();
+    inputFile = new Path("s3a://sample-s3-bucket/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+    outputPath = new Path("s3a://sample-s3-bucket/tmp/pinot/output");
+    bootstrapOutputPath = SegmentCreationMapper.getBootstrapOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(bootstrapOutputPath.toString(),
+        "s3a://sample-s3-bucket/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+  }
+
+
+  @Test
+  public void testBootstrapOutputPathHdfs() {
+    URI baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+    URI inputFile = new Path("hdfs://raw-data/tmp/pinot/input/airlineStats_data.avro").toUri();
+    Path outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+    Path bootstrapOutputPath = SegmentCreationMapper.getBootstrapOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(bootstrapOutputPath.toString(),
+        "hdfs://raw-data/tmp/pinot/output");
+
+    baseDir = new Path("hdfs://raw-data/tmp/pinot/input").toUri();
+    inputFile = new Path("hdfs://raw-data/tmp/pinot/input/yyyy=2019/mm=10/dd=18/airlineStats_data.avro").toUri();
+    outputPath = new Path("hdfs://raw-data/tmp/pinot/output");
+    bootstrapOutputPath = SegmentCreationMapper.getBootstrapOutputPath(baseDir, inputFile, outputPath);
+    Assert.assertEquals(bootstrapOutputPath.toString(),
+        "hdfs://raw-data/tmp/pinot/output/yyyy=2019/mm=10/dd=18");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org