You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/09/18 01:14:45 UTC

drill git commit: DRILL-1942-readers: - add extends AutoCloseable to RecordReader, and rename cleanup() to close(). - fix many warnings - formatting fixes

Repository: drill
Updated Branches:
  refs/heads/master 5fab01fab -> e52d473eb


DRILL-1942-readers:
- add extends AutoCloseable to RecordReader, and rename cleanup() to close().
- fix many warnings
- formatting fixes

DRILL-1942-readers:
- renamed cleanup() to close in the new JdbcRecordReader

Close apache/drill#154


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e52d473e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e52d473e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e52d473e

Branch: refs/heads/master
Commit: e52d473eb465699bf145cac70662809f1feae78e
Parents: 5fab01f
Author: Chris Westin <cw...@yahoo.com>
Authored: Thu Sep 10 18:28:00 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Thu Sep 17 13:52:11 2015 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 40 +++++-----
 .../drill/exec/store/hive/HiveRecordReader.java |  5 +-
 .../drill/exec/store/jdbc/JdbcRecordReader.java |  2 +-
 .../exec/store/mongo/MongoRecordReader.java     |  4 +-
 .../drill/exec/physical/impl/ScanBatch.java     | 82 ++++++++++----------
 .../apache/drill/exec/store/RecordReader.java   | 15 ++--
 .../drill/exec/store/avro/AvroRecordReader.java |  2 +-
 .../exec/store/easy/json/JSONRecordReader.java  | 33 ++++----
 .../compliant/CompliantTextRecordReader.java    |  2 +-
 .../drill/exec/store/mock/MockRecordReader.java | 51 ++++--------
 .../columnreaders/ParquetRecordReader.java      | 44 +++++------
 .../exec/store/parquet2/DrillParquetReader.java | 36 ++++-----
 .../drill/exec/store/pojo/PojoRecordReader.java |  6 +-
 .../exec/store/text/DrillTextRecordReader.java  | 40 ++++------
 .../exec/store/json/TestJsonRecordReader.java   | 16 ++--
 .../store/parquet/ParquetRecordReaderTest.java  | 11 +--
 16 files changed, 160 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 9458db2..ba10592 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -31,9 +31,9 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -75,7 +75,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
   private boolean rowKeyOnly;
 
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
-      List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
+      List<SchemaPath> projectedColumns, FragmentContext context) {
     hbaseConf = conf;
     hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName();
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
@@ -169,15 +169,16 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     done:
     for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
       Result result = null;
+      final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
       try {
-        if (operatorContext != null) {
-          operatorContext.getStats().startWait();
+        if (operatorStats != null) {
+          operatorStats.startWait();
         }
         try {
           result = resultScanner.next();
         } finally {
-          if (operatorContext != null) {
-            operatorContext.getStats().stopWait();
+          if (operatorStats != null) {
+            operatorStats.stopWait();
           }
         }
       } catch (IOException e) {
@@ -193,20 +194,20 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
         rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(), cells[0].getRowLength());
       }
       if (!rowKeyOnly) {
-        for (Cell cell : cells) {
-          int familyOffset = cell.getFamilyOffset();
-          int familyLength = cell.getFamilyLength();
-          byte[] familyArray = cell.getFamilyArray();
-          MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);
+        for (final Cell cell : cells) {
+          final int familyOffset = cell.getFamilyOffset();
+          final int familyLength = cell.getFamilyLength();
+          final byte[] familyArray = cell.getFamilyArray();
+          final MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);
 
-          int qualifierOffset = cell.getQualifierOffset();
-          int qualifierLength = cell.getQualifierLength();
-          byte[] qualifierArray = cell.getQualifierArray();
-          NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
+          final int qualifierOffset = cell.getQualifierOffset();
+          final int qualifierLength = cell.getQualifierLength();
+          final byte[] qualifierArray = cell.getQualifierArray();
+          final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
 
-          int valueOffset = cell.getValueOffset();
-          int valueLength = cell.getValueLength();
-          byte[] valueArray = cell.getValueArray();
+          final int valueOffset = cell.getValueOffset();
+          final int valueLength = cell.getValueLength();
+          final byte[] valueArray = cell.getValueArray();
           v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
         }
       }
@@ -246,7 +247,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (resultScanner != null) {
         resultScanner.close();
@@ -267,5 +268,4 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
       rowKeyVector.getMutator().setValueCount(count);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 9e87ec6..dc1bae3 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -133,7 +133,7 @@ public class HiveRecordReader extends AbstractRecordReader {
     String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
     try {
       format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
-      Class c = Class.forName(sLib);
+      Class<?> c = Class.forName(sLib);
       serde = (SerDe) c.getConstructor().newInstance();
       serde.initialize(job, properties);
     } catch (ReflectiveOperationException | SerDeException e) {
@@ -286,7 +286,6 @@ public class HiveRecordReader extends AbstractRecordReader {
   }
 
   private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    boolean success;
     for (int i = 0; i < selectedColumnNames.size(); i++) {
       String columnName = selectedColumnNames.get(i);
       Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName));
@@ -311,7 +310,7 @@ public class HiveRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (reader != null) {
         reader.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 69c45c2..463ae67 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -229,7 +229,7 @@ class JdbcRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     AutoCloseables.close(resultSet, logger);
     AutoCloseables.close(statement, logger);
     AutoCloseables.close(connection, logger);

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index c8b0699..dd5fcdf 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -52,7 +52,7 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 
 public class MongoRecordReader extends AbstractRecordReader {
-  static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
 
   private MongoCollection<Document> collection;
   private MongoCursor<Document> cursor;
@@ -187,7 +187,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 873ae76..1ac4f7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -92,47 +92,49 @@ public class ScanBatch implements CloseableRecordBatch {
     if (!readers.hasNext()) {
       throw new ExecutionSetupException("A scan batch must contain at least one reader.");
     }
-    this.currentReader = readers.next();
+    currentReader = readers.next();
     this.oContext = oContext;
 
     boolean setup = false;
     try {
       oContext.getStats().startProcessing();
-      this.currentReader.setup(oContext, mutator);
+      currentReader.setup(oContext, mutator);
       setup = true;
     } finally {
       // if we had an exception during setup, make sure to release existing data.
       if (!setup) {
-        currentReader.cleanup();
+        try {
+          currentReader.close();
+        } catch(final Exception e) {
+          throw new ExecutionSetupException(e);
+        }
       }
       oContext.getStats().stopProcessing();
     }
     this.partitionColumns = partitionColumns.iterator();
-    this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
+    partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
     this.selectedPartitionColumns = selectedPartitionColumns;
 
     // TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize
     // options; so labelValue = null.
     final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
-    this.partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
+    partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
 
     addPartitionVectors();
   }
 
-  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers)
+      throws ExecutionSetupException {
     this(subScanConfig, context,
         context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
         readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
   }
 
+  @Override
   public FragmentContext getContext() {
     return context;
   }
 
-  public OperatorContext getOperatorContext() {
-    return oContext;
-  }
-
   @Override
   public BatchSchema getSchema() {
     return schema;
@@ -156,6 +158,12 @@ public class ScanBatch implements CloseableRecordBatch {
     container.zeroVectors();
   }
 
+  private void clearFieldVectorMap() {
+    for (final ValueVector v : fieldVectorMap.values()) {
+      v.clear();
+    }
+  }
+
   @Override
   public IterOutcome next() {
     if (done) {
@@ -169,15 +177,13 @@ public class ScanBatch implements CloseableRecordBatch {
         currentReader.allocate(fieldVectorMap);
       } catch (OutOfMemoryException | OutOfMemoryRuntimeException e) {
         logger.debug("Caught Out of Memory Exception", e);
-        for (ValueVector v : fieldVectorMap.values()) {
-          v.clear();
-        }
+        clearFieldVectorMap();
         return IterOutcome.OUT_OF_MEMORY;
       }
       while ((recordCount = currentReader.next()) == 0) {
         try {
           if (!readers.hasNext()) {
-            currentReader.cleanup();
+            currentReader.close();
             releaseAssets();
             done = true;
             if (mutator.isNewSchema()) {
@@ -196,7 +202,7 @@ public class ScanBatch implements CloseableRecordBatch {
             fieldVectorMap.clear();
           }
 
-          currentReader.cleanup();
+          currentReader.close();
           currentReader = readers.next();
           partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
           currentReader.setup(oContext, mutator);
@@ -204,9 +210,7 @@ public class ScanBatch implements CloseableRecordBatch {
             currentReader.allocate(fieldVectorMap);
           } catch (OutOfMemoryException e) {
             logger.debug("Caught OutOfMemoryException");
-            for (ValueVector v : fieldVectorMap.values()) {
-              v.clear();
-            }
+            clearFieldVectorMap();
             return IterOutcome.OUT_OF_MEMORY;
           }
           addPartitionVectors();
@@ -249,7 +253,7 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-  private void addPartitionVectors() throws ExecutionSetupException{
+  private void addPartitionVectors() throws ExecutionSetupException {
     try {
       if (partitionVectors != null) {
         for (ValueVector v : partitionVectors) {
@@ -258,8 +262,10 @@ public class ScanBatch implements CloseableRecordBatch {
       }
       partitionVectors = Lists.newArrayList();
       for (int i : selectedPartitionColumns) {
-        MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
-        ValueVector v = mutator.addField(field, NullableVarCharVector.class);
+        final MaterializedField field =
+            MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i),
+                Types.optional(MinorType.VARCHAR));
+        final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
         partitionVectors.add(v);
       }
     } catch(SchemaChangeException e) {
@@ -269,12 +275,12 @@ public class ScanBatch implements CloseableRecordBatch {
 
   private void populatePartitionVectors() {
     for (int index = 0; index < selectedPartitionColumns.size(); index++) {
-      int i = selectedPartitionColumns.get(index);
-      NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
+      final int i = selectedPartitionColumns.get(index);
+      final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
       if (partitionValues.length > i) {
-        String val = partitionValues[i];
+        final String val = partitionValues[i];
         AllocationHelper.allocate(v, recordCount, val.length());
-        byte[] bytes = val.getBytes();
+        final byte[] bytes = val.getBytes();
         for (int j = 0; j < recordCount; j++) {
           v.getMutator().setSafe(j, bytes, 0, bytes.length);
         }
@@ -306,27 +312,24 @@ public class ScanBatch implements CloseableRecordBatch {
     return container.getValueAccessorById(clazz, ids);
   }
 
-
-
   private class Mutator implements OutputMutator {
+    private boolean schemaChange = true;
 
-    boolean schemaChange = true;
-
-    @SuppressWarnings("unchecked")
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
       // Check if the field exists
       ValueVector v = fieldVectorMap.get(field.key());
-
       if (v == null || v.getClass() != clazz) {
         // Field does not exist add it to the map and the output container
         v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
         if (!clazz.isAssignableFrom(v.getClass())) {
-          throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+          throw new SchemaChangeException(String.format(
+              "The class that was provided %s does not correspond to the expected vector type of %s.",
+              clazz.getSimpleName(), v.getClass().getSimpleName()));
         }
 
-        ValueVector old = fieldVectorMap.put(field.key(), v);
-        if(old != null){
+        final ValueVector old = fieldVectorMap.put(field.key(), v);
+        if (old != null) {
           old.clear();
           container.remove(old);
         }
@@ -336,12 +339,12 @@ public class ScanBatch implements CloseableRecordBatch {
         schemaChange = true;
       }
 
-      return (T) v;
+      return clazz.cast(v);
     }
 
     @Override
     public void allocate(int recordCount) {
-      for (ValueVector v : fieldVectorMap.values()) {
+      for (final ValueVector v : fieldVectorMap.values()) {
         AllocationHelper.allocate(v, recordCount, 50, 10);
       }
     }
@@ -378,18 +381,17 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     container.clear();
-    for (ValueVector v : partitionVectors) {
+    for (final ValueVector v : partitionVectors) {
       v.clear();
     }
     fieldVectorMap.clear();
-    currentReader.cleanup();
+    currentReader.close();
   }
 
   @Override
   public VectorContainer getOutgoingContainer() {
     throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 61ccac5..c2ab0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -26,30 +26,27 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.vector.ValueVector;
 
-public interface RecordReader {
-
+public interface RecordReader extends AutoCloseable {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
   /**
    * Configure the RecordReader with the provided schema and the record batch that should be written to.
    *
+   * @param context operator context for the reader
    * @param output
    *          The place where output for a particular scan should be written. The record reader is responsible for
    *          mutating the set of schema values for that particular record.
    * @throws ExecutionSetupException
    */
-  public abstract void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
+  void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
 
-  public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
+  void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
 
   /**
    * Increment record reader forward, writing into the provided output batch.
    *
    * @return The number of additional records added to the output.
    */
-  public abstract int next();
-
-  public abstract void cleanup();
-
-}
\ No newline at end of file
+  int next();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index a09cd53..210ed9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -351,7 +351,7 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     if (reader != null) {
       try {
         reader.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 8e78cf1..4d51199 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -57,7 +57,6 @@ public class JSONRecordReader extends AbstractRecordReader {
   private int recordCount;
   private long runningRecordCount = 0;
   private final FragmentContext fragmentContext;
-  private OperatorContext operatorContext;
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
 
@@ -82,13 +81,14 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param columns
    * @throws OutOfMemoryException
    */
-  public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
-      final List<SchemaPath> columns) throws OutOfMemoryException {
+  public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent,
+      final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException {
     this(fragmentContext, null, embeddedContent, fileSystem, columns);
   }
 
-  private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
-                          final List<SchemaPath> columns) throws OutOfMemoryException {
+  private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath,
+      final JsonNode embeddedContent, final DrillFileSystem fileSystem,
+      final List<SchemaPath> columns) {
 
     Preconditions.checkArgument(
         (inputPath == null && embeddedContent != null) ||
@@ -96,9 +96,9 @@ public class JSONRecordReader extends AbstractRecordReader {
         "One of inputPath or embeddedContent must be set but not both."
         );
 
-    if(inputPath != null){
+    if(inputPath != null) {
       this.hadoopPath = new Path(inputPath);
-    }else{
+    } else {
       this.embeddedContent = embeddedContent;
     }
 
@@ -113,7 +113,6 @@ public class JSONRecordReader extends AbstractRecordReader {
 
   @Override
   public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
-    this.operatorContext = context;
     try{
       if (hadoopPath != null) {
         this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
@@ -131,7 +130,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void setupParser() throws IOException{
+  private void setupParser() throws IOException {
     if(hadoopPath != null){
       jsonReader.setSource(stream);
     }else{
@@ -177,11 +176,11 @@ public class JSONRecordReader extends AbstractRecordReader {
     ReadState write = null;
 //    Stopwatch p = new Stopwatch().start();
     try{
-      outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION){
+      outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
         writer.setPosition(recordCount);
         write = jsonReader.write(writer);
 
-        if(write == ReadState.WRITE_SUCCEED){
+        if(write == ReadState.WRITE_SUCCEED) {
 //          logger.debug("Wrote record.");
           recordCount++;
         }else{
@@ -198,7 +197,6 @@ public class JSONRecordReader extends AbstractRecordReader {
 //      System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
 
       updateRunningCount();
-
       return recordCount;
 
     } catch (final Exception e) {
@@ -213,14 +211,9 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
-    try {
-      if(stream != null){
-        stream.close();
-      }
-    } catch (final IOException e) {
-      logger.warn("Failure while closing stream.", e);
+  public void close() throws Exception {
+    if(stream != null) {
+      stream.close();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index ae11ba7..ad65a94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -144,7 +144,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
    * This would internally close the input stream we are reading from.
    */
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (reader != null) {
         reader.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index fd97c48..c742690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -38,23 +37,19 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class MockRecordReader extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
 
-  private OutputMutator output;
-  private MockScanEntry config;
-  private FragmentContext context;
-  private BufferAllocator alcator;
+  private final MockScanEntry config;
+  private final FragmentContext context;
   private ValueVector[] valueVectors;
   private int recordsRead;
   private int batchRecordCount;
-  private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
 
 
-  public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException {
+  public MockRecordReader(FragmentContext context, MockScanEntry config) {
     this.context = context;
     this.config = config;
-    this.fragmentContext=context;
   }
 
   private int getEstimatedRecordSize(MockColumn[] types) {
@@ -67,38 +62,26 @@ public class MockRecordReader extends AbstractRecordReader {
 
   private MaterializedField getVector(String name, MajorType type, int length) {
     assert context != null : "Context shouldn't be null.";
-    MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type);
-
+    final MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type);
     return f;
-
-  }
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
   }
 
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try {
-      this.output = output;
-      int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+      final int estimateRowSize = getEstimatedRecordSize(config.getTypes());
       valueVectors = new ValueVector[config.getTypes().length];
       batchRecordCount = 250000 / estimateRowSize;
 
       for (int i = 0; i < config.getTypes().length; i++) {
-        MajorType type = config.getTypes()[i].getMajorType();
-        MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
-        Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        final MajorType type = config.getTypes()[i].getMajorType();
+        final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+        final Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
         valueVectors[i] = output.addField(field, vvClass);
       }
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException("Failure while setting up fields", e);
     }
-
   }
 
   @Override
@@ -107,23 +90,20 @@ public class MockRecordReader extends AbstractRecordReader {
       return 0;
     }
 
-    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
-
+    final int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
     recordsRead += recordSetSize;
-    for (ValueVector v : valueVectors) {
-
-//      logger.debug(String.format("MockRecordReader:  Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
-      ValueVector.Mutator m = v.getMutator();
+    for (final ValueVector v : valueVectors) {
+      final ValueVector.Mutator m = v.getMutator();
       m.generateTestData(recordSetSize);
-
     }
+
     return recordSetSize;
   }
 
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
     try {
-      for (ValueVector v : vectorMap.values()) {
+      for (final ValueVector v : vectorMap.values()) {
         AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
       }
     } catch (NullPointerException e) {
@@ -132,7 +112,6 @@ public class MockRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index da6fbfb..a4f5cac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -67,9 +67,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
 
   // TODO - should probably find a smarter way to set this, currently 1 megabyte
-  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
   public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
-  private static final String SEPERATOR = System.getProperty("file.separator");
 
   // used for clearing the last n bits of a byte
   public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
@@ -79,16 +77,16 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private int bitWidthAllFixedFields;
   private boolean allFieldsFixedLength;
   private int recordsPerBatch;
+  private OperatorContext operatorContext;
 //  private long totalRecords;
 //  private long rowGroupOffset;
 
-  private List<ColumnReader> columnStatuses;
+  private List<ColumnReader<?>> columnStatuses;
   private FileSystem fileSystem;
   private long batchSize;
   Path hadoopPath;
   private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
-  private OperatorContext operatorContext;
   // This is a parallel list to the columns list above, it is used to determine the subset of the project
   // pushdown columns that do not appear in this file
   private boolean[] columnsFound;
@@ -160,14 +158,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
     return batchSize;
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   /**
    * @param type a fixed length type from the parquet library enum
    * @return the length in pageDataByteArray of the type
@@ -205,9 +195,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
     return false;
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
   @Override
-  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
-    this.operatorContext = context;
+  public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = operatorContext;
     if (!isStarQuery()) {
       columnsFound = new boolean[getColumns().size()];
       nullFilledVectors = new ArrayList<>();
@@ -276,7 +270,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     try {
       ValueVector vector;
       SchemaElement schemaElement;
-      ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+      final ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
       boolean fieldFixedLength;
       for (int i = 0; i < columns.size(); ++i) {
@@ -342,7 +336,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
     try {
-      for (ValueVector v : vectorMap.values()) {
+      for (final ValueVector v : vectorMap.values()) {
         AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
       }
     } catch (NullPointerException e) {
@@ -366,17 +360,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   private void resetBatch() {
-    for (ColumnReader column : columnStatuses) {
+    for (final ColumnReader<?> column : columnStatuses) {
       column.valuesReadInCurrentPass = 0;
     }
-    for (VarLengthColumn r : varLengthReader.columns) {
+    for (final VarLengthColumn<?> r : varLengthReader.columns) {
       r.valuesReadInCurrentPass = 0;
     }
   }
 
  public void readAllFixedFields(long recordsToRead) throws IOException {
 
-   for (ColumnReader crs : columnStatuses) {
+   for (ColumnReader<?> crs : columnStatuses) {
      crs.processPages(recordsToRead);
    }
  }
@@ -386,7 +380,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     resetBatch();
     long recordsToRead = 0;
     try {
-      ColumnReader firstColumnStatus;
+      ColumnReader<?> firstColumnStatus;
       if (columnStatuses.size() > 0) {
         firstColumnStatus = columnStatuses.iterator().next();
       }
@@ -404,7 +398,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
           return 0;
         }
         recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
-        for (ValueVector vv : nullFilledVectors ) {
+        for (final ValueVector vv : nullFilledVectors ) {
           vv.getMutator().setValueCount( (int) recordsToRead);
         }
         mockRecordsRead += recordsToRead;
@@ -429,7 +423,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       // if we have requested columns that were not found in the file fill their vectors with null
       // (by simply setting the value counts inside of them, as they start null filled)
       if (nullFilledVectors != null) {
-        for (ValueVector vv : nullFilledVectors ) {
+        for (final ValueVector vv : nullFilledVectors ) {
           vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass());
         }
       }
@@ -451,13 +445,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
     // enable this for debugging when it is know that a whole file will be read
     // limit kills upstream operators once it has enough records, so this assert will fail
 //    assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
     if (columnStatuses != null) {
-      for (ColumnReader column : columnStatuses) {
+      for (final ColumnReader column : columnStatuses) {
         column.clear();
       }
       columnStatuses.clear();
@@ -467,7 +461,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     codecFactory.close();
 
     if (varLengthReader != null) {
-      for (VarLengthColumn r : varLengthReader.columns) {
+      for (final VarLengthColumn r : varLengthReader.columns) {
         r.clear();
       }
       varLengthReader.columns.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4c49def..01a9853 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -68,7 +68,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class DrillParquetReader extends AbstractRecordReader {
-
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
   // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
@@ -155,7 +154,7 @@ public class DrillParquetReader extends AbstractRecordReader {
     }
 
     // loop through projection columns and add any columns that are missing from parquet schema to columnsNotFound list
-    outer: for (SchemaPath columnPath : modifiedColumns) {
+    for (SchemaPath columnPath : modifiedColumns) {
       boolean notFound = true;
       for (SchemaPath schemaPath : schemaPaths) {
         if (schemaPath.contains(columnPath)) {
@@ -191,7 +190,7 @@ public class DrillParquetReader extends AbstractRecordReader {
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
     try {
-      for (ValueVector v : vectorMap.values()) {
+      for (final ValueVector v : vectorMap.values()) {
         AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
       }
     } catch (NullPointerException e) {
@@ -216,7 +215,7 @@ public class DrillParquetReader extends AbstractRecordReader {
             projection = schema;
         }
         if(columnsNotFound!=null && columnsNotFound.size()>0) {
-          nullFilledVectors = new ArrayList();
+          nullFilledVectors = new ArrayList<>();
           for(SchemaPath col: columnsNotFound){
             nullFilledVectors.add(
               (NullableIntVector)output.addField(MaterializedField.create(col,
@@ -234,7 +233,7 @@ public class DrillParquetReader extends AbstractRecordReader {
 
       ColumnIOFactory factory = new ColumnIOFactory(false);
       MessageColumnIO columnIO = factory.getColumnIO(projection, schema);
-      Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap();
+      Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap<>();
 
       for (ColumnChunkMetaData md : footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) {
         paths.put(md.getPath(), md);
@@ -273,7 +272,7 @@ public class DrillParquetReader extends AbstractRecordReader {
   }
 
   protected void handleAndRaise(String s, Exception e) {
-    cleanup();
+    close();
     String message = "Error in drill parquet reader (complex).\nMessage: " + s +
       "\nParquet Metadata: " + footer;
     throw new DrillRuntimeException(message, e);
@@ -319,7 +318,7 @@ public class DrillParquetReader extends AbstractRecordReader {
     // if we have requested columns that were not found in the file fill their vectors with null
     // (by simply setting the value counts inside of them, as they start null filled)
     if (nullFilledVectors != null) {
-      for (ValueVector vv : nullFilledVectors ) {
+      for (final ValueVector vv : nullFilledVectors) {
         vv.getMutator().setValueCount(count);
       }
     }
@@ -328,7 +327,7 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   private int getPercentFilled() {
     int filled = 0;
-    for (ValueVector v : primitiveVectors) {
+    for (final ValueVector v : primitiveVectors) {
       filled = Math.max(filled, v.getAccessor().getValueCount() * 100 / v.getValueCapacity());
       if (v instanceof VariableWidthVector) {
         filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
@@ -343,7 +342,7 @@ public class DrillParquetReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (pageReadStore != null) {
         pageReadStore.close();
@@ -354,20 +353,13 @@ public class DrillParquetReader extends AbstractRecordReader {
     }
   }
 
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
+  static public class ProjectedColumnType {
+    public final String projectedColumnName;
+    public final MessageType type;
 
-  static public class ProjectedColumnType{
-    ProjectedColumnType(String projectedColumnName, MessageType type){
-      this.projectedColumnName=projectedColumnName;
-      this.type=type;
+    ProjectedColumnType(String projectedColumnName, MessageType type) {
+      this.projectedColumnName = projectedColumnName;
+      this.type = type;
     }
-
-    public String projectedColumnName;
-    public MessageType type;
-
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 543121f..32fa31d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -123,7 +123,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
 
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
-    for (ValueVector v : vectorMap.values()) {
+    for (final ValueVector v : vectorMap.values()) {
       AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
     }
   }
@@ -146,7 +146,6 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
     injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
     try {
       int i =0;
-      outside:
       while (doCurrent || iterator.hasNext()) {
         if (doCurrent) {
           doCurrent = false;
@@ -175,7 +174,6 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index c59ade9..bc675af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -52,17 +52,14 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class DrillTextRecordReader extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
 
-  static final String COL_NAME = "columns";
+  private static final String COL_NAME = "columns";
 
   private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader;
-  private List<ValueVector> vectors = Lists.newArrayList();
+  private final List<ValueVector> vectors = Lists.newArrayList();
   private byte delimiter;
-  private int targetRecordCount;
   private FieldReference ref = new FieldReference(COL_NAME);
-  private FragmentContext fragmentContext;
-  private OperatorContext operatorContext;
   private RepeatedVarCharVector vector;
   private List<Integer> columnIds = Lists.newArrayList();
   private LongWritable key;
@@ -71,9 +68,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   private FileSplit split;
   private long totalRecordsRead;
 
-  public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
-      List<SchemaPath> columns) {
-    this.fragmentContext = context;
+  public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context,
+      char delimiter, List<SchemaPath> columns) {
     this.delimiter = (byte) delimiter;
     this.split = split;
     setColumns(columns);
@@ -95,7 +91,6 @@ public class DrillTextRecordReader extends AbstractRecordReader {
       Collections.sort(columnIds);
       numCols = columnIds.size();
     }
-    targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
 
     TextInputFormat inputFormat = new TextInputFormat();
     JobConf job = new JobConf(fsConf);
@@ -122,14 +117,6 @@ public class DrillTextRecordReader extends AbstractRecordReader {
     }).isPresent();
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
@@ -155,6 +142,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
     int batchSize = 0;
     try {
       int recordCount = 0;
+      final RepeatedVarCharVector.Mutator mutator = vector.getMutator();
       while (recordCount < Character.MAX_VALUE && batchSize < 200*1000 && reader.next(key, value)) {
         int start;
         int end = -1;
@@ -162,7 +150,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
         // index of the scanned field
         int p = 0;
         int i = 0;
-        vector.getMutator().startNewValue(recordCount);
+        mutator.startNewValue(recordCount);
         // Process each field in this line
         while (end < value.getLength() - 1) {
           if(numCols > 0 && p >= numCols) {
@@ -178,24 +166,24 @@ public class DrillTextRecordReader extends AbstractRecordReader {
             }
           }
           if (numCols > 0 && i++ < columnIds.get(p)) {
-            vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0);
+            mutator.addSafe(recordCount, value.getBytes(), start + 1, 0);
             continue;
           }
           p++;
-          vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
+          mutator.addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
           batchSize += end - start;
         }
         recordCount++;
         totalRecordsRead++;
       }
-      for (ValueVector v : vectors) {
+      for (final ValueVector v : vectors) {
         v.getMutator().setValueCount(recordCount);
       }
-      vector.getMutator().setValueCount(recordCount);
-//      logger.debug("text scan batch size {}", batchSize);
+      mutator.setValueCount(recordCount);
+      // logger.debug("text scan batch size {}", batchSize);
       return recordCount;
     } catch(Exception e) {
-      cleanup();
+      close();
       handleAndRaise("Failure while parsing text. Parser was at record: " + (totalRecordsRead + 1), e);
     }
 
@@ -229,7 +217,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void cleanup() {
+  public void close() {
     try {
       if (reader != null) {
         reader.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 8639e1c..efe877d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -27,11 +27,11 @@ import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.assertTrue;
 
-public class TestJsonRecordReader extends BaseTestQuery{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
+public class TestJsonRecordReader extends BaseTestQuery {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
 
   @Test
-  public void testComplexJsonInput() throws Exception{
+  public void testComplexJsonInput() throws Exception {
 //  test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
     test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink']  from cp.`jsoninput/input2.json` limit 10 ");
 //    test("select x from cp.`jsoninput/input2.json`");
@@ -45,14 +45,14 @@ public class TestJsonRecordReader extends BaseTestQuery{
   }
 
   @Test
-  public void testComplexMultipleTimes() throws Exception{
-    for(int i =0 ; i < 5; i++){
+  public void testComplexMultipleTimes() throws Exception {
+    for(int i =0 ; i < 5; i++) {
     test("select * from cp.`join/merge_join.json`");
     }
   }
 
   @Test
-  public void trySimpleQueryWithLimit() throws Exception{
+  public void trySimpleQueryWithLimit() throws Exception {
     test("select * from cp.`limit/test1.json` limit 10");
   }
 
@@ -90,9 +90,7 @@ public class TestJsonRecordReader extends BaseTestQuery{
 
   @Test //DRILL-1832
   public void testJsonWithNulls1() throws Exception {
-
     final String query="select * from cp.`jsoninput/twitter_43.json`";
-
     testBuilder()
             .sqlQuery(query)
             .unOrdered()
@@ -102,9 +100,7 @@ public class TestJsonRecordReader extends BaseTestQuery{
 
   @Test //DRILL-1832
   public void testJsonWithNulls2() throws Exception {
-
     final String query="select SUM(1) as `sum_Number_of_Records_ok` from cp.`/jsoninput/twitter_43.json` having (COUNT(1) > 0)";
-
     testBuilder()
             .sqlQuery(query)
             .unOrdered()

http://git-wip-us.apache.org/repos/asf/drill/blob/e52d473e/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 8ded703..14cfd8e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -386,7 +386,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     }
   }
 
-
   private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
       throws IOException {
     PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
@@ -395,7 +394,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
   }
 
-
   @Test
   public void testMultipleRowGroups() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -544,7 +542,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
         "unused, no file is generated", 1, props, QueryType.LOGICAL);
 
-    fields = new HashMap();
+    fields = new HashMap<>();
     props = new ParquetTestProperties(1, 100000, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populatePigTPCHSupplierFields(props);
     readEntries = "\"/tmp/tpc-h/supplier\"";
@@ -602,11 +600,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, QueryType.PHYSICAL);
   }
 
-  public static void main(String[] args) throws Exception {
-    // TODO - not sure why this has a main method, test below can be run directly
-    //new ParquetRecordReaderTest().testPerformance();
-  }
-
   @Test
   @Ignore
   public void testPerformance(@Injectable final DrillbitContext bitContext,
@@ -656,7 +649,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
         totalRowCount += rowCount;
       }
       System.out.println(String.format("Time completed: %s. ", watch.elapsed(TimeUnit.MILLISECONDS)));
-      rr.cleanup();
+      rr.close();
     }
 
     allocator.close();