You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/03/01 22:12:18 UTC
[incubator-pinot] branch hadoopParamFileFormat updated: Making
record reader instantiation general
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch hadoopParamFileFormat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/hadoopParamFileFormat by this push:
new 350c60b Making record reader instantiation general
350c60b is described below
commit 350c60bbd2bafd6bbd2ad124dc1efb14509b27d0
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Fri Mar 1 14:12:06 2019 -0800
Making record reader instantiation general
---
.../core/data/readers/GenericRowRecordReader.java | 6 +++++
.../readers/MultiplePinotSegmentRecordReader.java | 6 +++++
.../data/readers/PinotSegmentRecordReader.java | 6 +++++
.../pinot/core/data/readers/RecordReader.java | 2 +-
.../core/data/readers/RecordReaderFactory.java | 28 +++++++++++++++-------
.../core/data/readers/ThriftRecordReader.java | 6 +++++
.../pinot/core/minion/BackfillDateTimeColumn.java | 5 ++++
.../apache/pinot/core/minion/SegmentPurger.java | 5 ++++
.../core/minion/segment/MapperRecordReader.java | 6 +++++
.../core/minion/segment/ReducerRecordReader.java | 6 +++++
.../converter/RealtimeSegmentRecordReader.java | 6 +++++
11 files changed, 73 insertions(+), 9 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index 8fb1bbd..5615f3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -41,6 +42,11 @@ public class GenericRowRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
return _nextRowId < _numRows;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 9489719..6448a63 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -53,6 +54,11 @@ public class MultiplePinotSegmentRecordReader implements RecordReader {
this(indexDirs, null, null);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
/**
* Read records using the passed in schema and in the order of sorted column from multiple pinot segments.
* <p>Passed in schema must be a subset of the segment schema.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index bd3fd6f..50c1808 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.sort.PinotSegmentSorter;
import org.apache.pinot.core.data.readers.sort.SegmentSorter;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -59,6 +60,11 @@ public class PinotSegmentRecordReader implements RecordReader {
this(indexDir, null, null);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
/**
* Read records using the segment schema with the given schema and sort order
* <p>Passed in schema must be a subset of the segment schema.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
index 8cd3a09..ceff9d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
@@ -32,7 +32,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
public interface RecordReader extends Closeable {
/**
- * A temporary method only for the ORCRecordReader to allow instantiation through reflection.
+ * Initializes the record reader when needed
*/
void init(SegmentGeneratorConfig segmentGeneratorConfig);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index e0b5922..9234275 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -22,9 +22,14 @@ import com.google.common.base.Preconditions;
import java.io.File;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RecordReaderFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderFactory.class);
+
private RecordReaderFactory() {
}
@@ -36,6 +41,21 @@ public class RecordReaderFactory {
Schema schema = segmentGeneratorConfig.getSchema();
FileFormat fileFormat = segmentGeneratorConfig.getFormat();
String recordReaderPath = segmentGeneratorConfig.getRecordReaderPath();
+
+ // Allow for instantiation general record readers from a record reader path passed into segment generator config
+ // If this is set, this will override the file format
+ if (recordReaderPath != null) {
+ if (fileFormat != null) {
+ // We currently have default file format set to AVRO inside segment generator config,
+ // do not want to break this behavior for clients.
+ LOGGER.warn("Using recordReaderPath {} to read segment, ignoring fileformat {}",
+ recordReaderPath, fileFormat.toString());
+ }
+ RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
+ recordReader.init(segmentGeneratorConfig);
+ return recordReader;
+ }
+
switch (fileFormat) {
case AVRO:
case GZIPPED_AVRO:
@@ -49,14 +69,6 @@ public class RecordReaderFactory {
case THRIFT:
return new ThriftRecordReader(dataFile, schema,
(ThriftRecordReaderConfig) segmentGeneratorConfig.getReaderConfig());
- case ORC:
- // The ORC reader currently uses hive, we don't want to bring this dependency into pinot-core
- if (recordReaderPath == null) {
- throw new RuntimeException("Record reader path must be set for ORC");
- }
- RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
- recordReader.init(segmentGeneratorConfig);
- return recordReader;
default:
throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
index 2de79d0..a89337c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.thrift.TBase;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -75,6 +76,11 @@ public class ThriftRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
_bufferIn.mark(1);
int val = 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index ead3305..9d786b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -136,6 +136,11 @@ public class BackfillDateTimeColumn {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
return _baseRecordReader.hasNext();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 90e32d3..9581310 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -151,6 +151,11 @@ public class SegmentPurger {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_recordPurger == null) {
return _recordReader.hasNext();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 609a1f2..42abbd3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -54,6 +55,11 @@ public class MapperRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_finished) {
return false;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 2fbccb6..80e6507 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -51,6 +52,11 @@ public class ReducerRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_finished) {
return false;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index dc35c1d..235ed00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.converter;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -49,6 +50,11 @@ public class RealtimeSegmentRecordReader implements RecordReader {
_sortedDocIdIterationOrder = realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
public int[] getSortedDocIdIterationOrder() {
return _sortedDocIdIterationOrder;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org