You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/03/01 19:16:10 UTC

[incubator-pinot] branch hadoopParamFileFormat updated (20fd224 -> f0e8395)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch hadoopParamFileFormat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 20fd224  Adding orc configurability
     new f0e8395  Adding orc configurability in hadoop jobs

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (20fd224)
            \
             N -- N -- N   refs/heads/hadoopParamFileFormat (f0e8395)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pinot/core/data/readers/AvroRecordReader.java |  5 +++++
 .../org/apache/pinot/core/data/readers/CSVRecordReader.java  | 11 ++++++++---
 .../org/apache/pinot/core/data/readers/JSONRecordReader.java | 12 +++++++++---
 .../org/apache/pinot/core/data/readers/RecordReader.java     |  5 +++++
 .../apache/pinot/core/data/readers/RecordReaderFactory.java  |  8 ++++++++
 .../core/indexsegment/generator/SegmentGeneratorConfig.java  | 10 ++++++++++
 6 files changed, 45 insertions(+), 6 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding orc configurability in hadoop jobs

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch hadoopParamFileFormat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f0e839560a6d2f89f0ce13483f8bba0f9078eb31
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Fri Mar 1 11:02:47 2019 -0800

    Adding orc configurability in hadoop jobs
---
 .../org/apache/pinot/core/data/readers/AvroRecordReader.java |  5 +++++
 .../org/apache/pinot/core/data/readers/CSVRecordReader.java  | 11 ++++++++---
 .../java/org/apache/pinot/core/data/readers/FileFormat.java  |  2 +-
 .../org/apache/pinot/core/data/readers/JSONRecordReader.java | 12 +++++++++---
 .../org/apache/pinot/core/data/readers/RecordReader.java     |  5 +++++
 .../apache/pinot/core/data/readers/RecordReaderFactory.java  |  8 ++++++++
 .../core/indexsegment/generator/SegmentGeneratorConfig.java  | 10 ++++++++++
 .../java/org/apache/pinot/hadoop/job/JobConfigConstants.java |  1 +
 .../pinot/hadoop/job/mapper/SegmentCreationMapper.java       |  7 +++++++
 9 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index 6c21551..084a7a2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -57,6 +57,11 @@ public class AvroRecordReader implements RecordReader {
     }
   }
 
+  @Override
+  public void init() {
+
+  }
+
   private void validateSchema() {
     org.apache.avro.Schema avroSchema = _avroReader.getSchema();
     for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
index 2204acd..462a1e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
@@ -86,10 +86,15 @@ public class CSVRecordReader implements RecordReader {
       _multiValueDelimiter = config.getMultiValueDelimiter();
     }
 
-    init();
+    initialize();
   }
 
-  private void init()
+  @Override
+  public void init() {
+
+  }
+
+  private void initialize()
       throws IOException {
     _parser = _format.parse(RecordReaderUtils.getFileReader(_dataFile));
     _iterator = _parser.iterator();
@@ -131,7 +136,7 @@ public class CSVRecordReader implements RecordReader {
   public void rewind()
       throws IOException {
     _parser.close();
-    init();
+    initialize();
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 0826cd5..2d3149c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.core.data.readers;
 
 public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT
+  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, ORC
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
index 2770c7b..72c47d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
@@ -42,15 +42,16 @@ public class JSONRecordReader implements RecordReader {
   private JsonParser _parser;
   private Iterator<Map> _iterator;
 
+
   public JSONRecordReader(File dataFile, Schema schema)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
 
-    init();
+    initialize();
   }
 
-  private void init()
+  private void initialize()
       throws IOException {
     _parser = _factory.createParser(RecordReaderUtils.getFileReader(_dataFile));
     try {
@@ -62,6 +63,11 @@ public class JSONRecordReader implements RecordReader {
   }
 
   @Override
+  public void init() {
+
+  }
+
+  @Override
   public boolean hasNext() {
     return _iterator.hasNext();
   }
@@ -97,7 +103,7 @@ public class JSONRecordReader implements RecordReader {
   public void rewind()
       throws IOException {
     _parser.close();
-    init();
+    initialize();
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
index 8f656a7..dffee91 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
@@ -31,6 +31,11 @@ import org.apache.pinot.core.data.GenericRow;
 public interface RecordReader extends Closeable {
 
   /**
+   * Initializes record reader
+   */
+  void init();
+
+  /**
    * Return <code>true</code> if more records remain to be read.
    */
   boolean hasNext();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index 096f571..1305e3c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -35,6 +35,7 @@ public class RecordReaderFactory {
 
     Schema schema = segmentGeneratorConfig.getSchema();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
+    String recordReaderPath = segmentGeneratorConfig.getRecordReaderPath();
     switch (fileFormat) {
       case AVRO:
       case GZIPPED_AVRO:
@@ -48,6 +49,13 @@ public class RecordReaderFactory {
       case THRIFT:
         return new ThriftRecordReader(dataFile, schema,
             (ThriftRecordReaderConfig) segmentGeneratorConfig.getReaderConfig());
+      case ORC:
+        if (recordReaderPath == null) {
+          throw new RuntimeException("Record reader path must be set for ORC");
+        }
+        RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
+        recordReader.init();
+        return recordReader;
       default:
         throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 40024dc..faadca1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -82,6 +82,7 @@ public class SegmentGeneratorConfig {
   private String _dataDir = null;
   private String _inputFilePath = null;
   private FileFormat _format = FileFormat.AVRO;
+  private String _recordReaderPath = null;
   private String _outDir = null;
   private boolean _overwrite = false;
   private String _tableName = null;
@@ -155,6 +156,7 @@ public class SegmentGeneratorConfig {
     _timeColumnType = config._timeColumnType;
     _simpleDateFormat = config._simpleDateFormat;
     _onHeap = config._onHeap;
+    _recordReaderPath = config._recordReaderPath;
   }
 
   /**
@@ -324,6 +326,14 @@ public class SegmentGeneratorConfig {
     _format = format;
   }
 
+  public String getRecordReaderPath() {
+    return _recordReaderPath;
+  }
+
+  public void setRecordReaderPath(String recordReaderPath) {
+    _recordReaderPath = recordReaderPath;
+  }
+
   public String getOutDir() {
     return _outDir;
   }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 1a50c5c..60634e0 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -49,4 +49,5 @@ public class JobConfigConstants {
   public static final String PUSH_TO_PORT = "push.to.port";
 
   public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
+  public static final String INPUT_FILE_FORMAT = "input.file.format";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index 4258e96..043bb2a 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -68,6 +68,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
 
   // Optional
   protected TableConfig _tableConfig;
+  protected String _inputFileFormat;
   protected Path _readerConfigFile;
 
   // HDFS segment tar directory
@@ -102,6 +103,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
       _readerConfigFile = new Path(readerConfigFile);
     }
 
+    _inputFileFormat = _jobConf.get(JobConfigConstants.INPUT_FILE_FORMAT, null);
+
     // Set up segment name generator
     String segmentNameGeneratorType =
         _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
@@ -235,6 +238,10 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
   }
 
   protected FileFormat getFileFormat(String fileName) {
+    // ORC files do not necessarily have the .orc file extension
+    if (_inputFileFormat != null && _inputFileFormat.equalsIgnoreCase(FileFormat.ORC.toString())) {
+      return FileFormat.ORC;
+    }
     if (fileName.endsWith(".avro")) {
       return FileFormat.AVRO;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org