You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/01/24 18:35:24 UTC

[10/11] drill git commit: DRILL-6049: Misc. hygiene and code cleanup changes

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 1b96f28..a38c3c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -32,8 +32,9 @@ import com.carrotsearch.hppc.cursors.IntDoubleCursor;
 import com.carrotsearch.hppc.cursors.IntLongCursor;
 import com.carrotsearch.hppc.procedures.IntDoubleProcedure;
 import com.carrotsearch.hppc.procedures.IntLongProcedure;
+import com.google.common.annotations.VisibleForTesting;
 
-public class OperatorStats implements OperatorStatReceiver {
+public class OperatorStats {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
 
   protected final int operatorId;
@@ -89,7 +90,8 @@ public class OperatorStats implements OperatorStatReceiver {
     }
   }
 
-  private OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) {
+  @VisibleForTesting
+  public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) {
     super();
     this.allocator = allocator;
     this.operatorId = operatorId;
@@ -169,7 +171,6 @@ public class OperatorStats implements OperatorStatReceiver {
     inProcessing = false;
   }
 
-  @Override
   public synchronized void startWait() {
     assert !inWait : assertionError("starting waiting");
     stopProcessing();
@@ -177,7 +178,6 @@ public class OperatorStats implements OperatorStatReceiver {
     waitMark = System.nanoTime();
   }
 
-  @Override
   public synchronized void stopWait() {
     assert inWait : assertionError("stopping waiting");
     startProcessing();
@@ -203,7 +203,6 @@ public class OperatorStats implements OperatorStatReceiver {
         .toString();
   }
 
-
   public OperatorProfile getProfile() {
     final OperatorProfile.Builder b = OperatorProfile //
         .newBuilder() //
@@ -213,14 +212,11 @@ public class OperatorStats implements OperatorStatReceiver {
         .setProcessNanos(processingNanos)
         .setWaitNanos(waitNanos);
 
-    if(allocator != null){
+    if (allocator != null) {
       b.setPeakLocalMemoryAllocated(allocator.getPeakMemoryAllocation());
     }
 
-
-
     addAllMetrics(b);
-
     return b.build();
   }
 
@@ -249,7 +245,6 @@ public class OperatorStats implements OperatorStatReceiver {
     public void apply(int key, long value) {
       builder.addMetric(MetricValue.newBuilder().setMetricId(key).setLongValue(value));
     }
-
   }
 
   public void addLongMetrics(OperatorProfile.Builder builder) {
@@ -278,22 +273,62 @@ public class OperatorStats implements OperatorStatReceiver {
     }
   }
 
-  @Override
+  /**
+   * Set a stat to the specified long value. Creates the stat
+   * if the stat does not yet exist.
+   *
+   * @param metric the metric to update
+   * @param value the value to set
+   */
+
   public void addLongStat(MetricDef metric, long value){
     longMetrics.putOrAdd(metric.metricId(), value, value);
   }
 
-  @Override
+  @VisibleForTesting
+  public long getLongStat(MetricDef metric) {
+    return longMetrics.get(metric.metricId());
+  }
+
+  /**
+   * Add a double value to the existing value. Creates the stat
+   * (with an initial value of zero) if the stat does not yet
+   * exist.
+   *
+   * @param metric the metric to update
+   * @param value the value to add to the existing value
+   */
+
   public void addDoubleStat(MetricDef metric, double value){
     doubleMetrics.putOrAdd(metric.metricId(), value, value);
   }
 
-  @Override
+  @VisibleForTesting
+  public double getDoubleStat(MetricDef metric) {
+    return doubleMetrics.get(metric.metricId());
+  }
+
+  /**
+   * Add a long value to the existing value. Creates the stat
+   * (with an initial value of zero) if the stat does not yet
+   * exist.
+   *
+   * @param metric the metric to update
+   * @param value the value to add to the existing value
+   */
+
   public void setLongStat(MetricDef metric, long value){
     longMetrics.put(metric.metricId(), value);
   }
 
-  @Override
+  /**
+   * Set a stat to the specified double value. Creates the stat
+   * if the stat does not yet exist.
+   *
+   * @param metric the metric to update
+   * @param value the value to set
+   */
+
   public void setDoubleStat(MetricDef metric, double value){
     doubleMetrics.put(metric.metricId(), value);
   }
@@ -313,5 +348,4 @@ public class OperatorStats implements OperatorStatReceiver {
   public long getProcessingNanos() {
     return processingNanos;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index d17c337..d42680a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -40,7 +40,7 @@ public interface GroupScan extends Scan, HasAffinity{
    *                             2) NULL is interpreted as ALL_COLUMNS.
    *  How to handle skipAll query is up to each storage plugin, with different policy in corresponding RecordReader.
    */
-  public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.getSimplePath("*"));
+  public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);
 
   public static final long NO_COLUMN_STATS = -1;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 0871621..b418fd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -75,7 +75,7 @@ public class ImplCreator {
     // to true.
 
     if (AssertionUtil.isAssertionsEnabled() ||
-        context.getOptionSet().getOption(ExecConstants.ENABLE_ITERATOR_VALIDATOR) ||
+        context.getOptions().getOption(ExecConstants.ENABLE_ITERATOR_VALIDATOR) ||
         context.getConfig().getBoolean(ExecConstants.ENABLE_ITERATOR_VALIDATION)) {
       root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 77e9ea4..e0d1545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -96,7 +96,7 @@ public class ScanBatch implements CloseableRecordBatch {
     this.readers = readerList.iterator();
     this.implicitColumns = implicitColumnList.iterator();
     if (!readers.hasNext()) {
-      throw UserException.systemError(
+      throw UserException.internalError(
           new ExecutionSetupException("A scan batch must contain at least one reader."))
         .build(logger);
     }
@@ -110,7 +110,7 @@ public class ScanBatch implements CloseableRecordBatch {
       if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) {
         Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, "
             + "or implicit columns are not same across all the record readers!");
-        throw UserException.systemError(ex)
+        throw UserException.internalError(ex)
             .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName())
             .build(logger);
       }
@@ -210,11 +210,13 @@ public class ScanBatch implements CloseableRecordBatch {
           logger.error("Close failed for reader " + currentReaderClassName, e2);
         }
       }
-      throw UserException.systemError(e)
+      throw UserException.internalError(e)
           .addContext("Setup failed for", currentReaderClassName)
           .build(logger);
+    } catch (UserException ex) {
+      throw ex;
     } catch (Exception ex) {
-      throw UserException.systemError(ex).build(logger);
+      throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -254,7 +256,7 @@ public class ScanBatch implements CloseableRecordBatch {
       }
     } catch(SchemaChangeException e) {
       // No exception should be thrown here.
-      throw UserException.systemError(e)
+      throw UserException.internalError(e)
         .addContext("Failure while allocating implicit vectors")
         .build(logger);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 34c0f94..442a753 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
     throws SchemaChangeException, ClassTransformationException, IOException {
     return createNewPriorityQueue(
-      mainMapping, leftMapping, rightMapping, context.getOptionSet(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(),
+      mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(),
       config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3abf0fc..be0f61f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -300,7 +300,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return false;
     }
     final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
-    return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+    return expr.getPath().contains(SchemaPath.WILDCARD);
   }
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
@@ -542,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
       final NameSegment ref = ex.getRef().getRootSegment();
       final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-      final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+      final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD);
 
       if (refHasPrefix || exprContainsStar) {
         needed = true;
@@ -596,10 +596,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     final NameSegment ref = ex.getRef().getRootSegment();
     final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
     final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    final boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
-    final boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    final boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
+    final boolean exprIsStar = expr.getPath().equals(SchemaPath.WILDCARD);
+    final boolean refContainsStar = ref.getPath().contains(SchemaPath.WILDCARD);
+    final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD);
+    final boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.WILDCARD);
 
     String exprPrefix = EMPTY_STRING;
     String exprSuffix = expr.getPath();

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index ac6a462..e75619e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.validate;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
@@ -30,11 +35,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.VectorValidator;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.*;
-
 
 public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger =

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 2288419..4199191 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -38,7 +38,7 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
     Preconditions.checkArgument(children.size() == 1);
     RecordBatch child = children.iterator().next();
     IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child);
-    boolean validateBatches = context.getOptionSet().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
+    boolean validateBatches = context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
                               context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION);
     iter.enableBatchValidation(validateBatches);
     logger.trace("Iterator validation enabled for " + child.getClass().getSimpleName() +

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 2054c9b..9150fe3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -486,7 +486,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   @Override
   public void close() {
+
+    // Sanity check: if close is called twice, just ignore
+    // the second call.
+
+    if (sortImpl == null) {
+      return;
+    }
+
     RuntimeException ex = null;
+
+    // If we got far enough to have a results iterator, close
+    // that first.
+
     try {
       if (resultsIterator != null) {
         resultsIterator.close();
@@ -495,6 +507,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
     }
+
+    // Then close the "guts" of the sort operation.
+
     try {
       if (sortImpl != null) {
         sortImpl.close();
@@ -506,14 +521,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // The call to super.close() clears out the output container.
     // Doing so requires the allocator here, so it must be closed
-    // after the super call.
+    // (when closing the operator context) after the super call.
 
     try {
       super.close();
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
     }
-    // Note: allocator is closed by the FragmentManager
+
+    // Finally close the operator context (which closes the
+    // child allocator.)
+
+    try {
+      oContext.close();
+    } catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    }
     if (ex != null) {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index dee24dc..bca28f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -142,7 +142,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
   }
 
   private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
-    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet());
+    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions());
     cg.plainJavaCapable(true);
 
     // Uncomment out this line to debug the generated code.

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 4d21b11..dda42a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -80,7 +80,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
   private PriorityQueueCopier newCopier(VectorAccessible batch) {
     // Generate the copier code and obtain the resulting class
 
-    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet());
+    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions());
     ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 2d53c3b..9fb478e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -36,6 +36,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of the external sort which is wrapped into the Drill
  * "next" protocol by the {@link ExternalSortBatch} class.
@@ -105,7 +107,6 @@ public class SortImpl {
     public VectorContainer getContainer() { return dest; }
   }
 
-
   /**
    * Return results for a single input batch. No merge is needed;
    * the original (sorted) input batch is simply passed as the result.
@@ -200,7 +201,7 @@ public class SortImpl {
     allocator = opContext.getAllocator();
     config = sortConfig;
     memManager = new SortMemoryManager(config, allocator.getLimit());
-    metrics = new SortMetrics(opContext.getStatsWriter());
+    metrics = new SortMetrics(opContext.getStats());
     bufferedBatches = new BufferedBatches(opContext);
 
     // Request leniency from the allocator. Leniency
@@ -215,6 +216,9 @@ public class SortImpl {
     logger.debug("Config: Is allocator lenient? {}", allowed);
   }
 
+  @VisibleForTesting
+  public OperatorContext opContext() { return context; }
+
   public void setSchema(BatchSchema schema) {
     bufferedBatches.setSchema(schema);
     spilledRuns.setSchema(schema);
@@ -541,6 +545,11 @@ public class SortImpl {
     } catch (RuntimeException e) {
       ex = ex == null ? e : ex;
     }
+
+    // Note: don't close the operator context here. It must
+    // remain open until all containers are cleared, which
+    // is done in the ExternalSortBatch class.
+
     if (ex != null) {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
index 8d20cca..ae436bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.xsort.managed;
 
-import org.apache.drill.exec.ops.OperatorStatReceiver;
+import org.apache.drill.exec.ops.OperatorStats;
 
 public class SortMetrics {
 
@@ -38,12 +38,12 @@ public class SortMetrics {
    */
 
   private long minimumBufferSpace;
-  private OperatorStatReceiver stats;
+  private OperatorStats stats;
   private int spillCount;
   private int mergeCount;
   private long writeBytes;
 
-  public SortMetrics(OperatorStatReceiver stats) {
+  public SortMetrics(OperatorStats stats) {
     assert stats != null;
     this.stats = stats;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
index 1d43128..a9785ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -78,7 +78,7 @@ public class SorterWrapper extends BaseSortWrapper {
 
   private SingleBatchSorter newSorter(VectorAccessible batch) {
     CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
-        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet());
+        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions());
     ClassGenerator<SingleBatchSorter> g = cg.getRoot();
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
index 672af42..87cbf86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java
@@ -20,18 +20,16 @@ package org.apache.drill.exec.planner;
 
 import java.util.List;
 import java.util.Map;
-
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
 
 public class StarColumnHelper {
 
   public final static String PREFIX_DELIMITER = "\u00a6\u00a6";
 
-  public final static String STAR_COLUMN = "**";
-
-  public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + STAR_COLUMN;
+  public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + SchemaPath.WILDCARD;
 
   public static boolean containsStarColumn(RelDataType type) {
     if (! type.isStruct()) {
@@ -41,7 +39,7 @@ public class StarColumnHelper {
     List<String> fieldNames = type.getFieldNames();
 
     for (String s : fieldNames) {
-      if (s.startsWith(STAR_COLUMN)) {
+      if (s.startsWith(SchemaPath.WILDCARD)) {
         return true;
       }
     }
@@ -58,7 +56,7 @@ public class StarColumnHelper {
       if (expr instanceof RexInputRef) {
         String name = inputRowType.getFieldNames().get(((RexInputRef) expr).getIndex());
 
-        if (name.startsWith(STAR_COLUMN)) {
+        if (name.startsWith(SchemaPath.WILDCARD)) {
           return true;
         }
       }
@@ -72,7 +70,7 @@ public class StarColumnHelper {
   }
 
   public static boolean isNonPrefixedStarColumn(String fieldName) {
-    return fieldName.startsWith(STAR_COLUMN);
+    return fieldName.startsWith(SchemaPath.WILDCARD);
   }
 
   public static boolean isStarColumn(String fieldName) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 7b52eda..0cc016b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -33,7 +33,6 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
 import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -74,6 +73,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
+import org.apache.drill.exec.vector.DateUtilities;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -315,7 +315,7 @@ public class DrillConstExecutor implements RexExecutor {
                 milliseconds = intervalDayOut.milliseconds;
               }
               return rexBuilder.makeLiteral(
-                  new BigDecimal(days * (long) DateUtility.daysToStandardMillis + milliseconds),
+                  new BigDecimal(days * (long) DateUtilities.daysToStandardMillis + milliseconds),
                   TypeInferenceUtils.createCalciteTypeWithNullability(typeFactory, SqlTypeName.INTERVAL_DAY,
                       newCall.getType().isNullable()), false);
             }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
index 1230498..37e4ca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
@@ -26,8 +26,8 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.UnsupportedOperatorCollector;
-import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.planner.sql.parser.DrillCalciteWrapperUtility;
 import org.apache.drill.exec.util.ApproximateStringMatcher;
@@ -203,7 +203,7 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
   public RelNode visit(LogicalUnion union) {
     for(RelNode child : union.getInputs()) {
       for(RelDataTypeField dataField : child.getRowType().getFieldList()) {
-        if(dataField.getName().contains(StarColumnHelper.STAR_COLUMN)) {
+        if(dataField.getName().contains(SchemaPath.WILDCARD)) {
           unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
               "Union-All over schema-less tables must specify the columns explicitly\n" +
               "See Apache Drill JIRA: DRILL-2414");

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
index 394cde3..f323991 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
@@ -30,8 +30,8 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelConversionException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
@@ -107,7 +107,7 @@ public class SplitUpComplexExpressions extends BasePrelVisitor<Prel, Object, Rel
       RexBuilder builder = new RexBuilder(factory);
       allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), index));
 
-      if(fieldNames.get(index).contains(StarColumnHelper.STAR_COLUMN)) {
+      if(fieldNames.get(index).contains(SchemaPath.WILDCARD)) {
         relDataTypes.add(new RelDataTypeFieldImpl(fieldNames.get(index), allExprs.size(), factory.createSqlType(SqlTypeName.ANY)));
       } else {
         relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + exprIndex, allExprs.size(), factory.createSqlType(SqlTypeName.ANY)));

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 69458d4..c2227c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -157,7 +157,7 @@ public class SqlHandlerUtil {
             .message("Partition column %s is not in the SELECT list of CTAS!", col)
             .build(logger);
       } else {
-        if (field.getName().startsWith(StarColumnHelper.STAR_COLUMN)) {
+        if (field.getName().startsWith(SchemaPath.WILDCARD)) {
           colRefStarNames.add(col);
 
           final List<RexNode> operands = Lists.newArrayList();
@@ -191,10 +191,12 @@ public class SqlHandlerUtil {
 
       final List<RexNode> refs =
           new AbstractList<RexNode>() {
+            @Override
             public int size() {
               return originalFieldSize + colRefStarExprs.size();
             }
 
+            @Override
             public RexNode get(int index) {
               if (index < originalFieldSize) {
                 return RexInputRef.of(index, inputRowType.getFieldList());

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
index 377c7af..9037340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpandableHyperContainer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,6 +27,10 @@ public class ExpandableHyperContainer extends VectorContainer {
 
   public ExpandableHyperContainer(VectorAccessible batch) {
     super();
+    build(batch);
+  }
+
+  private void build(VectorAccessible batch) {
     if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {
       for (VectorWrapper<?> w : batch) {
         ValueVector[] hyperVector = w.getValueVectors();
@@ -42,17 +46,7 @@ public class ExpandableHyperContainer extends VectorContainer {
 
   public void addBatch(VectorAccessible batch) {
     if (wrappers.size() == 0) {
-      if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {
-        for (VectorWrapper<?> w : batch) {
-          ValueVector[] hyperVector = w.getValueVectors();
-          this.add(hyperVector, true);
-        }
-      } else {
-        for (VectorWrapper<?> w : batch) {
-          ValueVector[] hyperVector = { w.getValueVector() };
-          this.add(hyperVector, true);
-        }
-      }
+      build(batch);
       return;
     }
     if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 3e6bf64..f180b40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -61,6 +61,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     this.allocator = Preconditions.checkNotNull(allocator);
   }
 
+  public BufferAllocator allocator() { return allocator; }
+
   /**
    * Load a record batch from a single buffer.
    *
@@ -88,7 +90,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 
     // Set up to recognize previous fields that no longer exist.
     final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
-    for(final VectorWrapper<?> wrapper : container) {
+    for (final VectorWrapper<?> wrapper : container) {
       final ValueVector vector = wrapper.getValueVector();
       oldFields.put(vector.getField().getName(), vector);
     }
@@ -97,7 +99,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     try {
       final List<SerializedField> fields = def.getFieldList();
       int bufOffset = 0;
-      for(final SerializedField field : fields) {
+      for (final SerializedField field : fields) {
         final MaterializedField fieldDef = MaterializedField.create(field);
         ValueVector vector = oldFields.remove(fieldDef.getName());
 
@@ -105,7 +107,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
           // Field did not exist previously--is schema change.
           schemaChanged = true;
           vector = TypeHelper.getNewVector(fieldDef, allocator);
-        } else if (!vector.getField().getType().equals(fieldDef.getType())) {
+        } else if (! vector.getField().getType().equals(fieldDef.getType())) {
           // Field had different type before--is schema change.
           // clear previous vector
           vector.clear();
@@ -125,7 +127,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
         }
 
         // Load the vector.
-        if (field.getValueCount() == 0) {
+        if (buf == null) {
+          // Schema only
+        } else if (field.getValueCount() == 0) {
           AllocationHelper.allocate(vector, 0, 0, 0);
         } else {
           vector.load(field, buf.slice(bufOffset, field.getBufferLength()));
@@ -151,9 +155,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
       }
       throw cause;
     } finally {
-      if (!oldFields.isEmpty()) {
+      if (! oldFields.isEmpty()) {
         schemaChanged = true;
-        for (final ValueVector vector:oldFields.values()) {
+        for (final ValueVector vector : oldFields.values()) {
           vector.clear();
         }
       }
@@ -269,5 +273,4 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     container.clear();
     resetRecordCount();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index e1a1031..67b2522 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -105,9 +105,6 @@ public class SchemaUtil {
         if (field.getType().getMinorType() == MinorType.UNION) {
           UnionVector u = (UnionVector) tp.getTo();
           for (MinorType t : field.getType().getSubTypeList()) {
-            if (u.getField().getType().getSubTypeList().contains(t)) {
-              continue;
-            }
             u.addSubType(t);
           }
         }
@@ -116,22 +113,7 @@ public class SchemaUtil {
         ValueVector newVector = TypeHelper.getNewVector(field, allocator);
         Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
         UnionVector u = (UnionVector) newVector;
-        final ValueVector vv = u.addVector(tp.getTo());
-        MinorType type = v.getField().getType().getMinorType();
-        for (int i = 0; i < valueCount; i++) {
-          if (!vv.getAccessor().isNull(i)) {
-            u.getMutator().setType(i, type);
-          } else {
-            u.getMutator().setType(i, MinorType.LATE);
-          }
-        }
-        for (MinorType t : field.getType().getSubTypeList()) {
-          if (u.getField().getType().getSubTypeList().contains(t)) {
-            continue;
-          }
-          u.addSubType(t);
-        }
-        u.getMutator().setValueCount(valueCount);
+        u.setFirstType(tp.getTo(), valueCount);
         return u;
       }
     } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 9564f11..c46efaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,9 +18,6 @@
 package org.apache.drill.exec.record;
 
 import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -42,13 +39,14 @@ import com.google.common.collect.Sets;
 
 public class VectorContainer implements VectorAccessible {
 
+  private final BufferAllocator allocator;
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
   private BatchSchema schema;
   private int recordCount = -1;
-  private BufferAllocator allocator;
   private boolean schemaChanged = true; // Schema has changed since last built. Must rebuild schema
 
   public VectorContainer() {
+    allocator = null;
   }
 
   public VectorContainer(OperatorContext oContext) {
@@ -336,9 +334,13 @@ public class VectorContainer implements VectorAccessible {
   }
 
   public void clear() {
-    schema = null;
     zeroVectors();
+    removeAll();
+  }
+
+  public void removeAll() {
     wrappers.clear();
+    schema = null;
   }
 
   public void setRecordCount(int recordCount) {
@@ -365,13 +367,17 @@ public class VectorContainer implements VectorAccessible {
 
   /**
    * Clears the contained vectors.  (See {@link ValueVector#clear}).
+   * Note that the name <tt>zeroVector()</tt> in a value vector is
+   * used for the action to set all vectors to zero. Here it means
+   * to free the vector's memory. Sigh...
    */
+
   public void zeroVectors() {
     VectorAccessibleUtilities.clear(this);
   }
 
   public int getNumberOfColumns() {
-    return this.wrappers.size();
+    return wrappers.size();
   }
 
   public void allocateNew() {
@@ -415,4 +421,30 @@ public class VectorContainer implements VectorAccessible {
     merged.schemaChanged = false;
     return merged;
   }
+
+  /**
+   * Exchange buffers between two identical vector containers.
+   * The schemas must be identical in both column schemas and
+   * order. That is, after this call, data is exchanged between
+   * the containers. Requires that both containers be owned
+   * by the same allocator.
+   *
+   * @param other the target container with buffers to swap
+   */
+
+  public void exchange(VectorContainer other) {
+    assert schema.isEquivalent(other.schema);
+    assert wrappers.size() == other.wrappers.size();
+    assert allocator != null  &&  allocator == other.allocator;
+    for (int i = 0; i < wrappers.size(); i++) {
+      wrappers.get(i).getValueVector().exchange(
+          other.wrappers.get(i).getValueVector());
+    }
+    int temp = recordCount;
+    recordCount = other.recordCount;
+    other.recordCount = temp;
+    boolean temp2 = schemaChanged;
+    schemaChanged = other.schemaChanged;
+    other.schemaChanged = temp2;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index b3b46c2..c806669 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -177,7 +177,7 @@ public class WritableBatch implements AutoCloseable {
     return b;
   }
 
-  public static WritableBatch get(RecordBatch batch) {
+  public static WritableBatch get(VectorAccessible batch) {
     if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
       throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
     }
@@ -198,5 +198,4 @@ public class WritableBatch implements AutoCloseable {
       drillBuf.release(1);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 42f3473..7244148 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -86,11 +86,11 @@ public class SelectionVector2 implements AutoCloseable {
   }
 
   public void setBuffer(DrillBuf bufferHandle) {
-      /* clear the existing buffer */
-      clear();
+    /* clear the existing buffer */
+    clear();
 
-      this.buffer = bufferHandle;
-      buffer.retain(1);
+    this.buffer = bufferHandle;
+    buffer.retain(1);
   }
 
   public char getIndex(int index) {
@@ -106,7 +106,7 @@ public class SelectionVector2 implements AutoCloseable {
   }
 
   public void setIndex(int index, int value) {
-    buffer.setChar(index, value);
+    buffer.setChar(index * RECORD_SIZE, value);
   }
 
   public boolean allocateNewSafe(int size) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index bd077fb..b51fdca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,10 +20,10 @@ package org.apache.drill.exec.record.selection;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.DeadBuf;
 
 public class SelectionVector4 implements AutoCloseable {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
 
   private ByteBuf data;
   private int recordCount;
@@ -31,8 +31,9 @@ public class SelectionVector4 implements AutoCloseable {
   private int length;
 
   public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
-    if (recordCount > Integer.MAX_VALUE /4) {
-      throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size.  You requested an allocation of %d bytes.", recordCount * 4));
+    if (recordCount > Integer.MAX_VALUE / 4) {
+      throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
+          "You requested an allocation of %d bytes.", recordCount * 4L));
     }
     this.recordCount = recordCount;
     this.start = 0;
@@ -40,6 +41,17 @@ public class SelectionVector4 implements AutoCloseable {
     this.data = vector;
   }
 
+  public SelectionVector4(BufferAllocator allocator, int recordCount) {
+    if (recordCount > Integer.MAX_VALUE / 4) {
+      throw new IllegalStateException(String.format("Currently, Drill can only support allocations up to 2gb in size. " +
+          "You requested an allocation of %d bytes.", recordCount * 4L));
+    }
+    this.recordCount = recordCount;
+    this.start = 0;
+    this.length = recordCount;
+    this.data = allocator.buffer(recordCount * 4);
+  }
+
   public int getTotalCount() {
     return recordCount;
   }
@@ -54,15 +66,15 @@ public class SelectionVector4 implements AutoCloseable {
   }
 
   public void set(int index, int compound) {
-    data.setInt(index*4, compound);
+    data.setInt(index * 4, compound);
   }
 
   public void set(int index, int recordBatch, int recordIndex) {
-    data.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535));
+    data.setInt(index * 4, (recordBatch << 16) | (recordIndex & 65535));
   }
 
   public int get(int index) {
-    return data.getInt( (start+index)*4);
+    return data.getInt((start+index) * 4);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 4b71b0f..f9d44cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -17,24 +17,25 @@
  */
 package org.apache.drill.exec.store;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.Path;
 
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 
 public class ColumnExplorer {
 
@@ -46,13 +47,12 @@ public class ColumnExplorer {
   private final Map<String, ImplicitFileColumns> allImplicitColumns;
   private final Map<String, ImplicitFileColumns> selectedImplicitColumns;
 
-
   /**
    * Helper class that encapsulates logic for sorting out columns
    * between actual table columns, partition columns and implicit file columns.
    * Also populates map with implicit columns names as keys and their values
    */
-  public ColumnExplorer(FragmentContext context, List<SchemaPath> columns) {
+  public ColumnExplorer(FragmentContextInterface context, List<SchemaPath> columns) {
     this(context.getOptions(), columns);
   }
 
@@ -62,7 +62,7 @@ public class ColumnExplorer {
    * Also populates map with implicit columns names as keys and their values
    */
   public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
-    this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+    this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     this.columns = columns;
     this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
     this.selectedPartitionColumns = Lists.newArrayList();
@@ -74,7 +74,8 @@ public class ColumnExplorer {
   }
 
   /**
-   * Creates case insensitive map with implicit file columns as keys and appropriate ImplicitFileColumns enum as values
+   * Creates case insensitive map with implicit file columns as keys and
+   * appropriate ImplicitFileColumns enum as values
    */
   public static Map<String, ImplicitFileColumns> initImplicitFileColumns(OptionManager optionManager) {
     Map<String, ImplicitFileColumns> map = CaseInsensitiveMap.newHashMap();
@@ -94,8 +95,8 @@ public class ColumnExplorer {
    * @param column column
    * @return true if given column is partition, false otherwise
    */
-  public static boolean isPartitionColumn(OptionManager optionManager, SchemaPath column){
-    String partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+  public static boolean isPartitionColumn(OptionManager optionManager, SchemaPath column) {
+    String partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     String path = column.getRootSegmentPath();
     return isPartitionColumn(partitionDesignator, path);
   }
@@ -252,11 +253,11 @@ public class ColumnExplorer {
       this.name = name;
     }
 
+    public String optionName() { return name; }
+
     /**
      * Using file path calculates value for each implicit file column
      */
     public abstract String getValue(Path path);
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java
index 98e460a..1aa278a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java
@@ -96,5 +96,4 @@ public class ResourceInputStream extends ByteArrayInputStream implements Seekabl
       throw new EOFException();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 489e03c..e97316c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.dfs;
 
-import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -39,13 +38,14 @@ import java.util.EnumSet;
 public class DrillFSDataInputStream extends FSDataInputStream {
   private final FSDataInputStream underlyingIs;
   private final OpenFileTracker openFileTracker;
-  private final OperatorStatReceiver operatorStats;
+  private final OperatorStats operatorStats;
 
-  public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats) throws IOException {
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
     this(in, operatorStats, null);
   }
 
-  public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats,
+  @SuppressWarnings("resource")
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats,
       OpenFileTracker openFileTracker) throws IOException {
     super(new WrappedInputStream(in, operatorStats));
     underlyingIs = in;
@@ -194,9 +194,9 @@ public class DrillFSDataInputStream extends FSDataInputStream {
    */
   private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable {
     final FSDataInputStream is;
-    final OperatorStatReceiver operatorStats;
+    final OperatorStats operatorStats;
 
-    WrappedInputStream(FSDataInputStream is, OperatorStatReceiver operatorStats) {
+    WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) {
       this.is = is;
       this.operatorStats = operatorStats;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index fc540aa..52e1a96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.drill.exec.ops.OperatorStatReceiver;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -80,14 +80,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
 
   private final FileSystem underlyingFs;
-  private final OperatorStatReceiver operatorStats;
+  private final OperatorStats operatorStats;
   private final CompressionCodecFactory codecFactory;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
     this(fsConf, null);
   }
 
-  public DrillFileSystem(Configuration fsConf, OperatorStatReceiver operatorStats) throws IOException {
+  public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
     this.underlyingFs = FileSystem.get(fsConf);
     this.codecFactory = new CompressionCodecFactory(fsConf);
     this.operatorStats = operatorStats;

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
index 80bcef2..587201e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,11 +17,8 @@
  */
 package org.apache.drill.exec.store.dfs.easy;
 
-
 public interface FileWork {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileWork.class);
-
-  public String getPath();
-  public long getStart();
-  public long getLength();
+  String getPath();
+  long getStart();
+  long getLength();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
index 8910c26..ef8f861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.drill.common.exceptions.UserException;
+
 import com.google.common.base.Charsets;
 
 /**
@@ -67,23 +69,6 @@ public class HeaderBuilder extends TextOutput {
 
   public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
 
-  /**
-   * Exception that reports header errors. Is an unchecked exception
-   * to avoid cluttering the normal field reader interface.
-   */
-  public static class HeaderError extends RuntimeException {
-
-    private static final long serialVersionUID = 1L;
-
-    public HeaderError(String msg) {
-      super(msg);
-    }
-
-    public HeaderError(int colIndex, String msg) {
-      super("Column " + (colIndex + 1) + ": " + msg);
-    }
-  }
-
   public final List<String> headers = new ArrayList<>();
   public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
 
@@ -204,14 +189,18 @@ public class HeaderBuilder extends TextOutput {
     try {
       currentField.put(data);
     } catch (BufferOverflowException e) {
-      throw new HeaderError(headers.size(), "Column exceeds maximum length of " + MAX_HEADER_LEN);
+      throw UserException.dataReadError()
+        .message("Column exceeds maximum length of %d", MAX_HEADER_LEN)
+        .build(logger);
     }
   }
 
   @Override
   public void finishRecord() {
     if (headers.isEmpty()) {
-      throw new HeaderError("The file must define at least one header.");
+      throw UserException.dataReadError()
+        .message("The file must define at least one header.")
+        .build(logger);
     }
 
     // Force headers to be unique.

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
index d218846..7a7ad0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -372,15 +372,18 @@ final class TextReader {
         throw new TextParsingException(context, "Cannot use newline character within quoted string");
       }
 
-      if(success){
+      if (success) {
         if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) {
           context.stop();
         }
         return true;
-      }else{
+      } else {
         return false;
       }
 
+    } catch (UserException ex) {
+      stopParsing();
+      throw ex;
     } catch (StreamFinishedPseudoException ex) {
       stopParsing();
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index eadbeb0..a611c6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -34,7 +34,6 @@ import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -431,7 +430,7 @@ public class Metadata {
     List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
-    ALL_COLS.add(Utilities.STAR_COLUMN);
+    ALL_COLS.add(SchemaPath.STAR_COLUMN);
     boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
     ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 773f3d3..3935919 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -34,7 +34,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
@@ -226,7 +225,7 @@ public class ParquetSchema {
     for (int i = 0; i < columnsFound.length; i++) {
       SchemaPath col = projectedColumns.get(i);
       assert col != null;
-      if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) {
+      if ( ! columnsFound[i] && ! col.equals(SchemaPath.STAR_COLUMN)) {
         nullFilledVectors.add(createMissingColumn(col, output));
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 35358c2..9125e96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 
 public class Utilities {
 
-  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
   public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
 
   public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
@@ -87,7 +86,7 @@ public class Utilities {
     return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
       @Override
       public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+        return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN);
       }
     }).isPresent();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java
index 2611b86..d85d75b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/sql/TimePrintMillis.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,8 +19,8 @@ package org.apache.drill.exec.vector.accessor.sql;
 
 import java.sql.Time;
 
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
 
+@SuppressWarnings("serial")
 public class TimePrintMillis extends Time {
   private static final String[] leadingZeroes = {"", "0", "00"};
 
@@ -33,7 +33,7 @@ public class TimePrintMillis extends Time {
 
   @Override
   public String toString () {
-    int millis = (int) (getTime() % DateUtility.secondsToMillis);
+    int millis = (int) (getTime() % org.apache.drill.exec.vector.DateUtilities.secondsToMillis);
     StringBuilder time = new StringBuilder().append(super.toString());
 
     if (millis > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
index bf1448e..fec9e66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.drill.exec.vector.complex.fn;
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -30,6 +29,7 @@ import org.apache.drill.exec.expr.holders.TimeHolder;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.DateUtilities;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
@@ -258,9 +258,9 @@ abstract class VectorOutput {
       IntervalWriter intervalWriter = writer.interval();
       if(!isNull){
         final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString());
-        int months = DateUtility.monthsFromPeriod(p);
+        int months = DateUtilities.monthsFromPeriod(p);
         int days = p.getDays();
-        int millis = DateUtility.millisFromPeriod(p);
+        int millis = DateUtilities.periodToMillis(p);
         intervalWriter.writeInterval(months, days, millis);
       }
     }
@@ -295,6 +295,7 @@ abstract class VectorOutput {
       return innerRun();
     }
 
+    @SuppressWarnings("resource")
     @Override
     public void writeBinary(boolean isNull) throws IOException {
       VarBinaryWriter bin = writer.varBinary(fieldName);
@@ -326,6 +327,7 @@ abstract class VectorOutput {
 
     @Override
     public void writeTime(boolean isNull) throws IOException {
+      @SuppressWarnings("resource")
       TimeWriter t = writer.time(fieldName);
       if(!isNull){
         DateTimeFormatter f = ISODateTimeFormat.time();
@@ -333,6 +335,7 @@ abstract class VectorOutput {
       }
     }
 
+    @SuppressWarnings("resource")
     @Override
     public void writeTimestamp(boolean isNull) throws IOException {
       TimeStampWriter ts = writer.timeStamp(fieldName);
@@ -359,15 +362,16 @@ abstract class VectorOutput {
       IntervalWriter intervalWriter = writer.interval(fieldName);
       if(!isNull){
         final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString());
-        int months = DateUtility.monthsFromPeriod(p);
+        int months = DateUtilities.monthsFromPeriod(p);
         int days = p.getDays();
-        int millis = DateUtility.millisFromPeriod(p);
+        int millis = DateUtilities.periodToMillis(p);
         intervalWriter.writeInterval(months, days, millis);
       }
     }
 
     @Override
     public void writeInteger(boolean isNull) throws IOException {
+      @SuppressWarnings("resource")
       BigIntWriter intWriter = writer.bigInt(fieldName);
       if(!isNull){
         intWriter.writeBigInt(Long.parseLong(parser.getValueAsString()));

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
index 34c8c6c..22cd618 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
@@ -255,8 +255,11 @@ public class TestStarQueries extends BaseTestQuery {
   public void testStarView1() throws Exception {
     test("use dfs.tmp");
     test("create view vt1 as select * from cp.`tpch/region.parquet` r, cp.`tpch/nation.parquet` n where r.r_regionkey = n.n_regionkey");
-    test("select * from vt1");
-    test("drop view vt1");
+    try {
+      test("select * from vt1");
+    } finally {
+      test("drop view vt1");
+    }
   }
 
   @Test  // select star for a SchemaTable.
@@ -271,9 +274,12 @@ public class TestStarQueries extends BaseTestQuery {
         "join (select * from cp.`tpch/nation.parquet`) t2 " +
         "on t1.name = t2.n_name";
 
-    test("alter session set `planner.enable_broadcast_join` = false");
-    test(query);
-    test("alter session set `planner.enable_broadcast_join` = true");
+    try {
+      alterSession("planner.enable_broadcast_join", false);
+      test(query);
+    } finally {
+      resetSessionOption("planner.enable_broadcast_join");
+    }
     test(query);
   }