You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/09/18 01:14:45 UTC
drill git commit: DRILL-1942-readers: - add extends AutoCloseable to
RecordReader,
and rename cleanup() to close(). - fix many warnings - formatting fixes
Repository: drill
Updated Branches:
refs/heads/master 5fab01fab -> e52d473eb
DRILL-1942-readers:
- add extends AutoCloseable to RecordReader, and rename cleanup() to close().
- fix many warnings
- formatting fixes
DRILL-1942-readers:
- renamed cleanup() to close in the new JdbcRecordReader
Close apache/drill#154
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e52d473e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e52d473e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e52d473e
Branch: refs/heads/master
Commit: e52d473eb465699bf145cac70662809f1feae78e
Parents: 5fab01f
Author: Chris Westin <cw...@yahoo.com>
Authored: Thu Sep 10 18:28:00 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Thu Sep 17 13:52:11 2015 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseRecordReader.java | 40 +++++-----
.../drill/exec/store/hive/HiveRecordReader.java | 5 +-
.../drill/exec/store/jdbc/JdbcRecordReader.java | 2 +-
.../exec/store/mongo/MongoRecordReader.java | 4 +-
.../drill/exec/physical/impl/ScanBatch.java | 82 ++++++++++----------
.../apache/drill/exec/store/RecordReader.java | 15 ++--
.../drill/exec/store/avro/AvroRecordReader.java | 2 +-
.../exec/store/easy/json/JSONRecordReader.java | 33 ++++----
.../compliant/CompliantTextRecordReader.java | 2 +-
.../drill/exec/store/mock/MockRecordReader.java | 51 ++++--------
.../columnreaders/ParquetRecordReader.java | 44 +++++------
.../exec/store/parquet2/DrillParquetReader.java | 36 ++++-----
.../drill/exec/store/pojo/PojoRecordReader.java | 6 +-
.../exec/store/text/DrillTextRecordReader.java | 40 ++++------
.../exec/store/json/TestJsonRecordReader.java | 16 ++--
.../store/parquet/ParquetRecordReaderTest.java | 11 +--
16 files changed, 160 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 9458db2..ba10592 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -31,9 +31,9 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
@@ -75,7 +75,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
private boolean rowKeyOnly;
public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
- List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
+ List<SchemaPath> projectedColumns, FragmentContext context) {
hbaseConf = conf;
hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName();
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
@@ -169,15 +169,16 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
done:
for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
Result result = null;
+ final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
try {
- if (operatorContext != null) {
- operatorContext.getStats().startWait();
+ if (operatorStats != null) {
+ operatorStats.startWait();
}
try {
result = resultScanner.next();
} finally {
- if (operatorContext != null) {
- operatorContext.getStats().stopWait();
+ if (operatorStats != null) {
+ operatorStats.stopWait();
}
}
} catch (IOException e) {
@@ -193,20 +194,20 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(), cells[0].getRowLength());
}
if (!rowKeyOnly) {
- for (Cell cell : cells) {
- int familyOffset = cell.getFamilyOffset();
- int familyLength = cell.getFamilyLength();
- byte[] familyArray = cell.getFamilyArray();
- MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);
+ for (final Cell cell : cells) {
+ final int familyOffset = cell.getFamilyOffset();
+ final int familyLength = cell.getFamilyLength();
+ final byte[] familyArray = cell.getFamilyArray();
+ final MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);
- int qualifierOffset = cell.getQualifierOffset();
- int qualifierLength = cell.getQualifierLength();
- byte[] qualifierArray = cell.getQualifierArray();
- NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
+ final int qualifierOffset = cell.getQualifierOffset();
+ final int qualifierLength = cell.getQualifierLength();
+ final byte[] qualifierArray = cell.getQualifierArray();
+ final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
- int valueOffset = cell.getValueOffset();
- int valueLength = cell.getValueLength();
- byte[] valueArray = cell.getValueArray();
+ final int valueOffset = cell.getValueOffset();
+ final int valueLength = cell.getValueLength();
+ final byte[] valueArray = cell.getValueArray();
v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
}
}
@@ -246,7 +247,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
}
@Override
- public void cleanup() {
+ public void close() {
try {
if (resultScanner != null) {
resultScanner.close();
@@ -267,5 +268,4 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
rowKeyVector.getMutator().setValueCount(count);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 9e87ec6..dc1bae3 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -133,7 +133,7 @@ public class HiveRecordReader extends AbstractRecordReader {
String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
try {
format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
- Class c = Class.forName(sLib);
+ Class<?> c = Class.forName(sLib);
serde = (SerDe) c.getConstructor().newInstance();
serde.initialize(job, properties);
} catch (ReflectiveOperationException | SerDeException e) {
@@ -286,7 +286,6 @@ public class HiveRecordReader extends AbstractRecordReader {
}
private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
- boolean success;
for (int i = 0; i < selectedColumnNames.size(); i++) {
String columnName = selectedColumnNames.get(i);
Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName));
@@ -311,7 +310,7 @@ public class HiveRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
try {
if (reader != null) {
reader.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 69c45c2..463ae67 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -229,7 +229,7 @@ class JdbcRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
AutoCloseables.close(resultSet, logger);
AutoCloseables.close(statement, logger);
AutoCloseables.close(connection, logger);
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index c8b0699..dd5fcdf 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -52,7 +52,7 @@ import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
public class MongoRecordReader extends AbstractRecordReader {
- static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
private MongoCollection<Document> collection;
private MongoCursor<Document> cursor;
@@ -187,7 +187,7 @@ public class MongoRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 873ae76..1ac4f7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -92,47 +92,49 @@ public class ScanBatch implements CloseableRecordBatch {
if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
}
- this.currentReader = readers.next();
+ currentReader = readers.next();
this.oContext = oContext;
boolean setup = false;
try {
oContext.getStats().startProcessing();
- this.currentReader.setup(oContext, mutator);
+ currentReader.setup(oContext, mutator);
setup = true;
} finally {
// if we had an exception during setup, make sure to release existing data.
if (!setup) {
- currentReader.cleanup();
+ try {
+ currentReader.close();
+ } catch(final Exception e) {
+ throw new ExecutionSetupException(e);
+ }
}
oContext.getStats().stopProcessing();
}
this.partitionColumns = partitionColumns.iterator();
- this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
+ partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;
// TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize
// options; so labelValue = null.
final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
- this.partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
+ partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
addPartitionVectors();
}
- public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers)
+ throws ExecutionSetupException {
this(subScanConfig, context,
context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}
+ @Override
public FragmentContext getContext() {
return context;
}
- public OperatorContext getOperatorContext() {
- return oContext;
- }
-
@Override
public BatchSchema getSchema() {
return schema;
@@ -156,6 +158,12 @@ public class ScanBatch implements CloseableRecordBatch {
container.zeroVectors();
}
+ private void clearFieldVectorMap() {
+ for (final ValueVector v : fieldVectorMap.values()) {
+ v.clear();
+ }
+ }
+
@Override
public IterOutcome next() {
if (done) {
@@ -169,15 +177,13 @@ public class ScanBatch implements CloseableRecordBatch {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException | OutOfMemoryRuntimeException e) {
logger.debug("Caught Out of Memory Exception", e);
- for (ValueVector v : fieldVectorMap.values()) {
- v.clear();
- }
+ clearFieldVectorMap();
return IterOutcome.OUT_OF_MEMORY;
}
while ((recordCount = currentReader.next()) == 0) {
try {
if (!readers.hasNext()) {
- currentReader.cleanup();
+ currentReader.close();
releaseAssets();
done = true;
if (mutator.isNewSchema()) {
@@ -196,7 +202,7 @@ public class ScanBatch implements CloseableRecordBatch {
fieldVectorMap.clear();
}
- currentReader.cleanup();
+ currentReader.close();
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
currentReader.setup(oContext, mutator);
@@ -204,9 +210,7 @@ public class ScanBatch implements CloseableRecordBatch {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException e) {
logger.debug("Caught OutOfMemoryException");
- for (ValueVector v : fieldVectorMap.values()) {
- v.clear();
- }
+ clearFieldVectorMap();
return IterOutcome.OUT_OF_MEMORY;
}
addPartitionVectors();
@@ -249,7 +253,7 @@ public class ScanBatch implements CloseableRecordBatch {
}
}
- private void addPartitionVectors() throws ExecutionSetupException{
+ private void addPartitionVectors() throws ExecutionSetupException {
try {
if (partitionVectors != null) {
for (ValueVector v : partitionVectors) {
@@ -258,8 +262,10 @@ public class ScanBatch implements CloseableRecordBatch {
}
partitionVectors = Lists.newArrayList();
for (int i : selectedPartitionColumns) {
- MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
- ValueVector v = mutator.addField(field, NullableVarCharVector.class);
+ final MaterializedField field =
+ MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i),
+ Types.optional(MinorType.VARCHAR));
+ final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
partitionVectors.add(v);
}
} catch(SchemaChangeException e) {
@@ -269,12 +275,12 @@ public class ScanBatch implements CloseableRecordBatch {
private void populatePartitionVectors() {
for (int index = 0; index < selectedPartitionColumns.size(); index++) {
- int i = selectedPartitionColumns.get(index);
- NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
+ final int i = selectedPartitionColumns.get(index);
+ final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
if (partitionValues.length > i) {
- String val = partitionValues[i];
+ final String val = partitionValues[i];
AllocationHelper.allocate(v, recordCount, val.length());
- byte[] bytes = val.getBytes();
+ final byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
}
@@ -306,27 +312,24 @@ public class ScanBatch implements CloseableRecordBatch {
return container.getValueAccessorById(clazz, ids);
}
-
-
private class Mutator implements OutputMutator {
+ private boolean schemaChange = true;
- boolean schemaChange = true;
-
- @SuppressWarnings("unchecked")
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
// Check if the field exists
ValueVector v = fieldVectorMap.get(field.key());
-
if (v == null || v.getClass() != clazz) {
// Field does not exist add it to the map and the output container
v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
if (!clazz.isAssignableFrom(v.getClass())) {
- throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ throw new SchemaChangeException(String.format(
+ "The class that was provided %s does not correspond to the expected vector type of %s.",
+ clazz.getSimpleName(), v.getClass().getSimpleName()));
}
- ValueVector old = fieldVectorMap.put(field.key(), v);
- if(old != null){
+ final ValueVector old = fieldVectorMap.put(field.key(), v);
+ if (old != null) {
old.clear();
container.remove(old);
}
@@ -336,12 +339,12 @@ public class ScanBatch implements CloseableRecordBatch {
schemaChange = true;
}
- return (T) v;
+ return clazz.cast(v);
}
@Override
public void allocate(int recordCount) {
- for (ValueVector v : fieldVectorMap.values()) {
+ for (final ValueVector v : fieldVectorMap.values()) {
AllocationHelper.allocate(v, recordCount, 50, 10);
}
}
@@ -378,18 +381,17 @@ public class ScanBatch implements CloseableRecordBatch {
}
@Override
- public void close() {
+ public void close() throws Exception {
container.clear();
- for (ValueVector v : partitionVectors) {
+ for (final ValueVector v : partitionVectors) {
v.clear();
}
fieldVectorMap.clear();
- currentReader.cleanup();
+ currentReader.close();
}
@Override
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 61ccac5..c2ab0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,30 +26,27 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.vector.ValueVector;
-public interface RecordReader {
-
+public interface RecordReader extends AutoCloseable {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
/**
* Configure the RecordReader with the provided schema and the record batch that should be written to.
*
+ * @param context operator context for the reader
* @param output
* The place where output for a particular scan should be written. The record reader is responsible for
* mutating the set of schema values for that particular record.
* @throws ExecutionSetupException
*/
- public abstract void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
+ void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
- public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
+ void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
/**
* Increment record reader forward, writing into the provided output batch.
*
* @return The number of additional records added to the output.
*/
- public abstract int next();
-
- public abstract void cleanup();
-
-}
\ No newline at end of file
+ int next();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index a09cd53..210ed9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -351,7 +351,7 @@ public class AvroRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
if (reader != null) {
try {
reader.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 8e78cf1..4d51199 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -57,7 +57,6 @@ public class JSONRecordReader extends AbstractRecordReader {
private int recordCount;
private long runningRecordCount = 0;
private final FragmentContext fragmentContext;
- private OperatorContext operatorContext;
private final boolean enableAllTextMode;
private final boolean readNumbersAsDouble;
@@ -82,13 +81,14 @@ public class JSONRecordReader extends AbstractRecordReader {
* @param columns
* @throws OutOfMemoryException
*/
- public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
- final List<SchemaPath> columns) throws OutOfMemoryException {
+ public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent,
+ final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException {
this(fragmentContext, null, embeddedContent, fileSystem, columns);
}
- private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
- final List<SchemaPath> columns) throws OutOfMemoryException {
+ private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath,
+ final JsonNode embeddedContent, final DrillFileSystem fileSystem,
+ final List<SchemaPath> columns) {
Preconditions.checkArgument(
(inputPath == null && embeddedContent != null) ||
@@ -96,9 +96,9 @@ public class JSONRecordReader extends AbstractRecordReader {
"One of inputPath or embeddedContent must be set but not both."
);
- if(inputPath != null){
+ if(inputPath != null) {
this.hadoopPath = new Path(inputPath);
- }else{
+ } else {
this.embeddedContent = embeddedContent;
}
@@ -113,7 +113,6 @@ public class JSONRecordReader extends AbstractRecordReader {
@Override
public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
- this.operatorContext = context;
try{
if (hadoopPath != null) {
this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
@@ -131,7 +130,7 @@ public class JSONRecordReader extends AbstractRecordReader {
}
}
- private void setupParser() throws IOException{
+ private void setupParser() throws IOException {
if(hadoopPath != null){
jsonReader.setSource(stream);
}else{
@@ -177,11 +176,11 @@ public class JSONRecordReader extends AbstractRecordReader {
ReadState write = null;
// Stopwatch p = new Stopwatch().start();
try{
- outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION){
+ outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
writer.setPosition(recordCount);
write = jsonReader.write(writer);
- if(write == ReadState.WRITE_SUCCEED){
+ if(write == ReadState.WRITE_SUCCEED) {
// logger.debug("Wrote record.");
recordCount++;
}else{
@@ -198,7 +197,6 @@ public class JSONRecordReader extends AbstractRecordReader {
// System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
updateRunningCount();
-
return recordCount;
} catch (final Exception e) {
@@ -213,14 +211,9 @@ public class JSONRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
- try {
- if(stream != null){
- stream.close();
- }
- } catch (final IOException e) {
- logger.warn("Failure while closing stream.", e);
+ public void close() throws Exception {
+ if(stream != null) {
+ stream.close();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index ae11ba7..ad65a94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -144,7 +144,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
* This would internally close the input stream we are reading from.
*/
@Override
- public void cleanup() {
+ public void close() {
try {
if (reader != null) {
reader.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index fd97c48..c742690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
@@ -38,23 +37,19 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
public class MockRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
- private OutputMutator output;
- private MockScanEntry config;
- private FragmentContext context;
- private BufferAllocator alcator;
+ private final MockScanEntry config;
+ private final FragmentContext context;
private ValueVector[] valueVectors;
private int recordsRead;
private int batchRecordCount;
- private FragmentContext fragmentContext;
private OperatorContext operatorContext;
- public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException {
+ public MockRecordReader(FragmentContext context, MockScanEntry config) {
this.context = context;
this.config = config;
- this.fragmentContext=context;
}
private int getEstimatedRecordSize(MockColumn[] types) {
@@ -67,38 +62,26 @@ public class MockRecordReader extends AbstractRecordReader {
private MaterializedField getVector(String name, MajorType type, int length) {
assert context != null : "Context shouldn't be null.";
- MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type);
-
+ final MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type);
return f;
-
- }
-
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
}
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
try {
- this.output = output;
- int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+ final int estimateRowSize = getEstimatedRecordSize(config.getTypes());
valueVectors = new ValueVector[config.getTypes().length];
batchRecordCount = 250000 / estimateRowSize;
for (int i = 0; i < config.getTypes().length; i++) {
- MajorType type = config.getTypes()[i].getMajorType();
- MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
- Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+ final MajorType type = config.getTypes()[i].getMajorType();
+ final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+ final Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
valueVectors[i] = output.addField(field, vvClass);
}
} catch (SchemaChangeException e) {
throw new ExecutionSetupException("Failure while setting up fields", e);
}
-
}
@Override
@@ -107,23 +90,20 @@ public class MockRecordReader extends AbstractRecordReader {
return 0;
}
- int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
-
+ final int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
recordsRead += recordSetSize;
- for (ValueVector v : valueVectors) {
-
-// logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
- ValueVector.Mutator m = v.getMutator();
+ for (final ValueVector v : valueVectors) {
+ final ValueVector.Mutator m = v.getMutator();
m.generateTestData(recordSetSize);
-
}
+
return recordSetSize;
}
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
try {
- for (ValueVector v : vectorMap.values()) {
+ for (final ValueVector v : vectorMap.values()) {
AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
}
} catch (NullPointerException e) {
@@ -132,7 +112,6 @@ public class MockRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index da6fbfb..a4f5cac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -67,9 +67,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
// TODO - should probably find a smarter way to set this, currently 1 megabyte
- private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
- private static final String SEPERATOR = System.getProperty("file.separator");
// used for clearing the last n bits of a byte
public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
@@ -79,16 +77,16 @@ public class ParquetRecordReader extends AbstractRecordReader {
private int bitWidthAllFixedFields;
private boolean allFieldsFixedLength;
private int recordsPerBatch;
+ private OperatorContext operatorContext;
// private long totalRecords;
// private long rowGroupOffset;
- private List<ColumnReader> columnStatuses;
+ private List<ColumnReader<?>> columnStatuses;
private FileSystem fileSystem;
private long batchSize;
Path hadoopPath;
private VarLenBinaryReader varLengthReader;
private ParquetMetadata footer;
- private OperatorContext operatorContext;
// This is a parallel list to the columns list above, it is used to determine the subset of the project
// pushdown columns that do not appear in this file
private boolean[] columnsFound;
@@ -160,14 +158,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
return batchSize;
}
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
-
/**
* @param type a fixed length type from the parquet library enum
* @return the length in pageDataByteArray of the type
@@ -205,9 +195,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
return false;
}
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
@Override
- public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
- this.operatorContext = context;
+ public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+ this.operatorContext = operatorContext;
if (!isStarQuery()) {
columnsFound = new boolean[getColumns().size()];
nullFilledVectors = new ArrayList<>();
@@ -276,7 +270,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
try {
ValueVector vector;
SchemaElement schemaElement;
- ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+ final ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
// initialize all of the column read status objects
boolean fieldFixedLength;
for (int i = 0; i < columns.size(); ++i) {
@@ -342,7 +336,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
try {
- for (ValueVector v : vectorMap.values()) {
+ for (final ValueVector v : vectorMap.values()) {
AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
}
} catch (NullPointerException e) {
@@ -366,17 +360,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
private void resetBatch() {
- for (ColumnReader column : columnStatuses) {
+ for (final ColumnReader<?> column : columnStatuses) {
column.valuesReadInCurrentPass = 0;
}
- for (VarLengthColumn r : varLengthReader.columns) {
+ for (final VarLengthColumn<?> r : varLengthReader.columns) {
r.valuesReadInCurrentPass = 0;
}
}
public void readAllFixedFields(long recordsToRead) throws IOException {
- for (ColumnReader crs : columnStatuses) {
+ for (ColumnReader<?> crs : columnStatuses) {
crs.processPages(recordsToRead);
}
}
@@ -386,7 +380,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
resetBatch();
long recordsToRead = 0;
try {
- ColumnReader firstColumnStatus;
+ ColumnReader<?> firstColumnStatus;
if (columnStatuses.size() > 0) {
firstColumnStatus = columnStatuses.iterator().next();
}
@@ -404,7 +398,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
return 0;
}
recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
- for (ValueVector vv : nullFilledVectors ) {
+ for (final ValueVector vv : nullFilledVectors ) {
vv.getMutator().setValueCount( (int) recordsToRead);
}
mockRecordsRead += recordsToRead;
@@ -429,7 +423,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
// if we have requested columns that were not found in the file fill their vectors with null
// (by simply setting the value counts inside of them, as they start null filled)
if (nullFilledVectors != null) {
- for (ValueVector vv : nullFilledVectors ) {
+ for (final ValueVector vv : nullFilledVectors ) {
vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass());
}
}
@@ -451,13 +445,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
// enable this for debugging when it is know that a whole file will be read
// limit kills upstream operators once it has enough records, so this assert will fail
// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
if (columnStatuses != null) {
- for (ColumnReader column : columnStatuses) {
+ for (final ColumnReader column : columnStatuses) {
column.clear();
}
columnStatuses.clear();
@@ -467,7 +461,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
codecFactory.close();
if (varLengthReader != null) {
- for (VarLengthColumn r : varLengthReader.columns) {
+ for (final VarLengthColumn r : varLengthReader.columns) {
r.clear();
}
varLengthReader.columns.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4c49def..01a9853 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -68,7 +68,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class DrillParquetReader extends AbstractRecordReader {
-
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
// same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
@@ -155,7 +154,7 @@ public class DrillParquetReader extends AbstractRecordReader {
}
// loop through projection columns and add any columns that are missing from parquet schema to columnsNotFound list
- outer: for (SchemaPath columnPath : modifiedColumns) {
+ for (SchemaPath columnPath : modifiedColumns) {
boolean notFound = true;
for (SchemaPath schemaPath : schemaPaths) {
if (schemaPath.contains(columnPath)) {
@@ -191,7 +190,7 @@ public class DrillParquetReader extends AbstractRecordReader {
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
try {
- for (ValueVector v : vectorMap.values()) {
+ for (final ValueVector v : vectorMap.values()) {
AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
}
} catch (NullPointerException e) {
@@ -216,7 +215,7 @@ public class DrillParquetReader extends AbstractRecordReader {
projection = schema;
}
if(columnsNotFound!=null && columnsNotFound.size()>0) {
- nullFilledVectors = new ArrayList();
+ nullFilledVectors = new ArrayList<>();
for(SchemaPath col: columnsNotFound){
nullFilledVectors.add(
(NullableIntVector)output.addField(MaterializedField.create(col,
@@ -234,7 +233,7 @@ public class DrillParquetReader extends AbstractRecordReader {
ColumnIOFactory factory = new ColumnIOFactory(false);
MessageColumnIO columnIO = factory.getColumnIO(projection, schema);
- Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap();
+ Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap<>();
for (ColumnChunkMetaData md : footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) {
paths.put(md.getPath(), md);
@@ -273,7 +272,7 @@ public class DrillParquetReader extends AbstractRecordReader {
}
protected void handleAndRaise(String s, Exception e) {
- cleanup();
+ close();
String message = "Error in drill parquet reader (complex).\nMessage: " + s +
"\nParquet Metadata: " + footer;
throw new DrillRuntimeException(message, e);
@@ -319,7 +318,7 @@ public class DrillParquetReader extends AbstractRecordReader {
// if we have requested columns that were not found in the file fill their vectors with null
// (by simply setting the value counts inside of them, as they start null filled)
if (nullFilledVectors != null) {
- for (ValueVector vv : nullFilledVectors ) {
+ for (final ValueVector vv : nullFilledVectors) {
vv.getMutator().setValueCount(count);
}
}
@@ -328,7 +327,7 @@ public class DrillParquetReader extends AbstractRecordReader {
private int getPercentFilled() {
int filled = 0;
- for (ValueVector v : primitiveVectors) {
+ for (final ValueVector v : primitiveVectors) {
filled = Math.max(filled, v.getAccessor().getValueCount() * 100 / v.getValueCapacity());
if (v instanceof VariableWidthVector) {
filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
@@ -343,7 +342,7 @@ public class DrillParquetReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
try {
if (pageReadStore != null) {
pageReadStore.close();
@@ -354,20 +353,13 @@ public class DrillParquetReader extends AbstractRecordReader {
}
}
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
+ static public class ProjectedColumnType {
+ public final String projectedColumnName;
+ public final MessageType type;
- static public class ProjectedColumnType{
- ProjectedColumnType(String projectedColumnName, MessageType type){
- this.projectedColumnName=projectedColumnName;
- this.type=type;
+ ProjectedColumnType(String projectedColumnName, MessageType type) {
+ this.projectedColumnName = projectedColumnName;
+ this.type = type;
}
-
- public String projectedColumnName;
- public MessageType type;
-
-
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 543121f..32fa31d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -123,7 +123,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
- for (ValueVector v : vectorMap.values()) {
+ for (final ValueVector v : vectorMap.values()) {
AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
}
}
@@ -146,7 +146,6 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
try {
int i =0;
- outside:
while (doCurrent || iterator.hasNext()) {
if (doCurrent) {
doCurrent = false;
@@ -175,7 +174,6 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index c59ade9..bc675af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -52,17 +52,14 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class DrillTextRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
- static final String COL_NAME = "columns";
+ private static final String COL_NAME = "columns";
private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader;
- private List<ValueVector> vectors = Lists.newArrayList();
+ private final List<ValueVector> vectors = Lists.newArrayList();
private byte delimiter;
- private int targetRecordCount;
private FieldReference ref = new FieldReference(COL_NAME);
- private FragmentContext fragmentContext;
- private OperatorContext operatorContext;
private RepeatedVarCharVector vector;
private List<Integer> columnIds = Lists.newArrayList();
private LongWritable key;
@@ -71,9 +68,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
private FileSplit split;
private long totalRecordsRead;
- public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
- List<SchemaPath> columns) {
- this.fragmentContext = context;
+ public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context,
+ char delimiter, List<SchemaPath> columns) {
this.delimiter = (byte) delimiter;
this.split = split;
setColumns(columns);
@@ -95,7 +91,6 @@ public class DrillTextRecordReader extends AbstractRecordReader {
Collections.sort(columnIds);
numCols = columnIds.size();
}
- targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
TextInputFormat inputFormat = new TextInputFormat();
JobConf job = new JobConf(fsConf);
@@ -122,14 +117,6 @@ public class DrillTextRecordReader extends AbstractRecordReader {
}).isPresent();
}
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
-
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
@@ -155,6 +142,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
int batchSize = 0;
try {
int recordCount = 0;
+ final RepeatedVarCharVector.Mutator mutator = vector.getMutator();
while (recordCount < Character.MAX_VALUE && batchSize < 200*1000 && reader.next(key, value)) {
int start;
int end = -1;
@@ -162,7 +150,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
// index of the scanned field
int p = 0;
int i = 0;
- vector.getMutator().startNewValue(recordCount);
+ mutator.startNewValue(recordCount);
// Process each field in this line
while (end < value.getLength() - 1) {
if(numCols > 0 && p >= numCols) {
@@ -178,24 +166,24 @@ public class DrillTextRecordReader extends AbstractRecordReader {
}
}
if (numCols > 0 && i++ < columnIds.get(p)) {
- vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0);
+ mutator.addSafe(recordCount, value.getBytes(), start + 1, 0);
continue;
}
p++;
- vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
+ mutator.addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
batchSize += end - start;
}
recordCount++;
totalRecordsRead++;
}
- for (ValueVector v : vectors) {
+ for (final ValueVector v : vectors) {
v.getMutator().setValueCount(recordCount);
}
- vector.getMutator().setValueCount(recordCount);
-// logger.debug("text scan batch size {}", batchSize);
+ mutator.setValueCount(recordCount);
+ // logger.debug("text scan batch size {}", batchSize);
return recordCount;
} catch(Exception e) {
- cleanup();
+ close();
handleAndRaise("Failure while parsing text. Parser was at record: " + (totalRecordsRead + 1), e);
}
@@ -229,7 +217,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
}
@Override
- public void cleanup() {
+ public void close() {
try {
if (reader != null) {
reader.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 8639e1c..efe877d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -27,11 +27,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class TestJsonRecordReader extends BaseTestQuery{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
+public class TestJsonRecordReader extends BaseTestQuery {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
@Test
- public void testComplexJsonInput() throws Exception{
+ public void testComplexJsonInput() throws Exception {
// test("select z[0]['orange'] from cp.`jsoninput/input2.json` limit 10");
test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` limit 10 ");
// test("select x from cp.`jsoninput/input2.json`");
@@ -45,14 +45,14 @@ public class TestJsonRecordReader extends BaseTestQuery{
}
@Test
- public void testComplexMultipleTimes() throws Exception{
- for(int i =0 ; i < 5; i++){
+ public void testComplexMultipleTimes() throws Exception {
+ for(int i =0 ; i < 5; i++) {
test("select * from cp.`join/merge_join.json`");
}
}
@Test
- public void trySimpleQueryWithLimit() throws Exception{
+ public void trySimpleQueryWithLimit() throws Exception {
test("select * from cp.`limit/test1.json` limit 10");
}
@@ -90,9 +90,7 @@ public class TestJsonRecordReader extends BaseTestQuery{
@Test //DRILL-1832
public void testJsonWithNulls1() throws Exception {
-
final String query="select * from cp.`jsoninput/twitter_43.json`";
-
testBuilder()
.sqlQuery(query)
.unOrdered()
@@ -102,9 +100,7 @@ public class TestJsonRecordReader extends BaseTestQuery{
@Test //DRILL-1832
public void testJsonWithNulls2() throws Exception {
-
final String query="select SUM(1) as `sum_Number_of_Records_ok` from cp.`/jsoninput/twitter_43.json` having (COUNT(1) > 0)";
-
testBuilder()
.sqlQuery(query)
.unOrdered()
http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 8ded703..14cfd8e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -386,7 +386,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
}
}
-
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
@@ -395,7 +394,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
-
@Test
public void testMultipleRowGroups() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -544,7 +542,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
"unused, no file is generated", 1, props, QueryType.LOGICAL);
- fields = new HashMap();
+ fields = new HashMap<>();
props = new ParquetTestProperties(1, 100000, DEFAULT_BYTES_PER_PAGE, fields);
TestFileGenerator.populatePigTPCHSupplierFields(props);
readEntries = "\"/tmp/tpc-h/supplier\"";
@@ -602,11 +600,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, QueryType.PHYSICAL);
}
- public static void main(String[] args) throws Exception {
- // TODO - not sure why this has a main method, test below can be run directly
- //new ParquetRecordReaderTest().testPerformance();
- }
-
@Test
@Ignore
public void testPerformance(@Injectable final DrillbitContext bitContext,
@@ -656,7 +649,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
totalRowCount += rowCount;
}
System.out.println(String.format("Time completed: %s. ", watch.elapsed(TimeUnit.MILLISECONDS)));
- rr.cleanup();
+ rr.close();
}
allocator.close();