You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/03/01 22:12:18 UTC

[incubator-pinot] branch hadoopParamFileFormat updated: Making record reader instantiation general

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch hadoopParamFileFormat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/hadoopParamFileFormat by this push:
     new 350c60b  Making record reader instantiation general
350c60b is described below

commit 350c60bbd2bafd6bbd2ad124dc1efb14509b27d0
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Fri Mar 1 14:12:06 2019 -0800

    Making record reader instantiation general
---
 .../core/data/readers/GenericRowRecordReader.java  |  6 +++++
 .../readers/MultiplePinotSegmentRecordReader.java  |  6 +++++
 .../data/readers/PinotSegmentRecordReader.java     |  6 +++++
 .../pinot/core/data/readers/RecordReader.java      |  2 +-
 .../core/data/readers/RecordReaderFactory.java     | 28 +++++++++++++++-------
 .../core/data/readers/ThriftRecordReader.java      |  6 +++++
 .../pinot/core/minion/BackfillDateTimeColumn.java  |  5 ++++
 .../apache/pinot/core/minion/SegmentPurger.java    |  5 ++++
 .../core/minion/segment/MapperRecordReader.java    |  6 +++++
 .../core/minion/segment/ReducerRecordReader.java   |  6 +++++
 .../converter/RealtimeSegmentRecordReader.java     |  6 +++++
 11 files changed, 73 insertions(+), 9 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index 8fb1bbd..5615f3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -41,6 +42,11 @@ public class GenericRowRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     return _nextRowId < _numRows;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 9489719..6448a63 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -53,6 +54,11 @@ public class MultiplePinotSegmentRecordReader implements RecordReader {
     this(indexDirs, null, null);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   /**
    * Read records using the passed in schema and in the order of sorted column from multiple pinot segments.
    * <p>Passed in schema must be a subset of the segment schema.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index bd3fd6f..50c1808 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.sort.PinotSegmentSorter;
 import org.apache.pinot.core.data.readers.sort.SegmentSorter;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -59,6 +60,11 @@ public class PinotSegmentRecordReader implements RecordReader {
     this(indexDir, null, null);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   /**
    * Read records using the segment schema with the given schema and sort order
    * <p>Passed in schema must be a subset of the segment schema.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
index 8cd3a09..ceff9d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
@@ -32,7 +32,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 public interface RecordReader extends Closeable {
 
   /**
-   * A temporary method only for the ORCRecordReader to allow instantiation through reflection.
+   * Initializes the record reader when needed
    */
   void init(SegmentGeneratorConfig segmentGeneratorConfig);
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index e0b5922..9234275 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -22,9 +22,14 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class RecordReaderFactory {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderFactory.class);
+
   private RecordReaderFactory() {
   }
 
@@ -36,6 +41,21 @@ public class RecordReaderFactory {
     Schema schema = segmentGeneratorConfig.getSchema();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
     String recordReaderPath = segmentGeneratorConfig.getRecordReaderPath();
+
+    // Allow for instantiation general record readers from a record reader path passed into segment generator config
+    // If this is set, this will override the file format
+    if (recordReaderPath != null) {
+      if (fileFormat != null) {
+        // We currently have default file format set to AVRO inside segment generator config,
+        // do not want to break this behavior for clients.
+        LOGGER.warn("Using recordReaderPath {} to read segment, ignoring fileformat {}",
+            recordReaderPath, fileFormat.toString());
+      }
+      RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
+      recordReader.init(segmentGeneratorConfig);
+      return recordReader;
+    }
+
     switch (fileFormat) {
       case AVRO:
       case GZIPPED_AVRO:
@@ -49,14 +69,6 @@ public class RecordReaderFactory {
       case THRIFT:
         return new ThriftRecordReader(dataFile, schema,
             (ThriftRecordReaderConfig) segmentGeneratorConfig.getReaderConfig());
-      case ORC:
-        // The ORC reader currently uses hive, we don't want to bring this dependency into pinot-core
-        if (recordReaderPath == null) {
-          throw new RuntimeException("Record reader path must be set for ORC");
-        }
-        RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
-        recordReader.init(segmentGeneratorConfig);
-        return recordReader;
       default:
         throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
index 2de79d0..a89337c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TFieldIdEnum;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -75,6 +76,11 @@ public class ThriftRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     _bufferIn.mark(1);
     int val = 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index ead3305..9d786b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -136,6 +136,11 @@ public class BackfillDateTimeColumn {
     }
 
     @Override
+    public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+    }
+
+    @Override
     public boolean hasNext() {
       return _baseRecordReader.hasNext();
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 90e32d3..9581310 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -151,6 +151,11 @@ public class SegmentPurger {
     }
 
     @Override
+    public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+    }
+
+    @Override
     public boolean hasNext() {
       if (_recordPurger == null) {
         return _recordReader.hasNext();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 609a1f2..42abbd3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
 import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -54,6 +55,11 @@ public class MapperRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     if (_finished) {
       return false;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 2fbccb6..80e6507 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -51,6 +52,11 @@ public class ReducerRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     if (_finished) {
       return false;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index dc35c1d..235ed00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.converter;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 
 
@@ -49,6 +50,11 @@ public class RealtimeSegmentRecordReader implements RecordReader {
     _sortedDocIdIterationOrder = realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   public int[] getSortedDocIdIterationOrder() {
     return _sortedDocIdIterationOrder;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org