You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:48 UTC

[01/10] git commit: DRILL-317: StorageEngineRegistry creates a new StorageEngine everytime getEngine() is called

Updated Branches:
  refs/heads/master 3e2000341 -> 4a83dae35


DRILL-317: StorageEngineRegistry creates a new StorageEngine everytime getEngine() is called


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/622aad0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/622aad0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/622aad0f

Branch: refs/heads/master
Commit: 622aad0fa976608bbf95a573091fa4429d9b0bf5
Parents: 3e20003
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:33:34 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:33:34 2013 -0800

----------------------------------------------------------------------
 .../drill/exec/store/StorageEngineRegistry.java    | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/622aad0f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index bd4efcd..4cc7346 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -28,12 +28,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.server.DrillbitContext;
 
 public class StorageEngineRegistry {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
-  
+
   private Map<Object, Constructor<? extends StorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends StorageEngine>>();
   private Map<StorageEngineConfig, StorageEngine> activeEngines = new HashMap<StorageEngineConfig, StorageEngine>();
 
@@ -42,7 +41,7 @@ public class StorageEngineRegistry {
     init(context.getConfig());
     this.context = context;
   }
-  
+
   @SuppressWarnings("unchecked")
   public void init(DrillConfig config){
     Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
@@ -63,21 +62,23 @@ public class StorageEngineRegistry {
       }
     }
   }
-  
-  public StorageEngine getEngine(StorageEngineConfig engineConfig) throws ExecutionSetupException{
+
+  public synchronized StorageEngine getEngine(StorageEngineConfig engineConfig) throws ExecutionSetupException{
     StorageEngine engine = activeEngines.get(engineConfig);
     if(engine != null) return engine;
     Constructor<? extends StorageEngine> c = availableEngines.get(engineConfig.getClass());
     if(c == null) throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
     try {
-      return c.newInstance(engineConfig, context);
+      engine = c.newInstance(engineConfig, context);
+      activeEngines.put(engineConfig, engine);
+      return engine;
     } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
       Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
       if(t instanceof ExecutionSetupException) throw ((ExecutionSetupException) t);
       throw new ExecutionSetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
     }
   }
-  
 
-  
+
+
 }


[03/10] git commit: DRILL-312: Modularize org.apache.drill.exec.physical.impl.ImplCreator using operator creator registry

Posted by ja...@apache.org.
DRILL-312: Modularize org.apache.drill.exec.physical.impl.ImplCreator using operator creator registry


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b91f2e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b91f2e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b91f2e8a

Branch: refs/heads/master
Commit: b91f2e8a837d7079c305442b41a3f3ef20b9846f
Parents: ba5e652
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:37:40 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:37:40 2013 -0800

----------------------------------------------------------------------
 .../exceptions/ExecutionSetupException.java     |  10 ++
 .../drill/exec/physical/impl/ImplCreator.java   | 175 ++-----------------
 .../physical/impl/OperatorCreatorRegistry.java  | 100 +++++++++++
 .../drill/exec/server/DrillbitContext.java      |  17 +-
 .../apache/drill/exec/client/DumpCatTest.java   |   6 +-
 .../exec/fn/impl/TestRepeatedFunction.java      |   2 +
 .../physical/impl/TestComparisonFunctions.java  |   1 +
 .../exec/physical/impl/TestSimpleFunctions.java |   5 +
 .../drill/exec/physical/impl/agg/TestAgg.java   |   2 +
 .../physical/impl/filter/TestSimpleFilter.java  |   3 +
 .../exec/physical/impl/join/TestMergeJoin.java  |   5 +
 .../physical/impl/limit/TestSimpleLimit.java    |   6 +
 .../impl/project/TestSimpleProjection.java      |   2 +
 .../exec/physical/impl/sort/TestSimpleSort.java |   3 +
 .../physical/impl/svremover/TestSVRemover.java  |   2 +
 .../impl/trace/TestTraceMultiRecordBatch.java   |   4 +-
 .../impl/trace/TestTraceOutputDump.java         |   4 +-
 .../physical/impl/union/TestSimpleUnion.java    |   4 +
 18 files changed, 177 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java b/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
index 2e50ae5..ae647f1 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
@@ -17,9 +17,19 @@
  */
 package org.apache.drill.common.exceptions;
 
+import java.lang.reflect.InvocationTargetException;
+
 
 public class ExecutionSetupException extends DrillException{
+  private static final long serialVersionUID = -6943409010231014085L;
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionSetupException.class);
+
+  public static ExecutionSetupException fromThrowable(String message, Throwable cause) {
+    Throwable t = cause instanceof InvocationTargetException 
+        ? ((InvocationTargetException)cause).getTargetException() : cause;
+    if(t instanceof ExecutionSetupException) return ((ExecutionSetupException) t);
+    return new ExecutionSetupException(message, t);
+  }
   
   public ExecutionSetupException() {
     super();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 3e4c1eb..2dc5f16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -25,43 +24,8 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.IteratorValidator;
-import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.config.MergingReceiverPOP;
-import org.apache.drill.exec.physical.config.OrderedPartitionSender;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SelectionVectorRemover;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.Trace;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
-import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
-import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
-import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionSenderCreator;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
-import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
-import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
-import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
-import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
-import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
-import org.apache.drill.exec.physical.impl.validate.IteratorValidatorCreator;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.json.JSONScanBatchCreator;
-import org.apache.drill.exec.store.json.JSONSubScan;
-import org.apache.drill.exec.store.mock.MockScanBatchCreator;
-import org.apache.drill.exec.store.mock.MockSubScanPOP;
-import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -69,138 +33,38 @@ import com.google.common.collect.Lists;
 /**
  * Implementation of the physical operator visitor
  */
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
-  // Element creators supported by this visitor
-  private MockScanBatchCreator msc = new MockScanBatchCreator();
-  private ParquetScanBatchCreator parquetScan = new ParquetScanBatchCreator();
-  private ScreenCreator sc = new ScreenCreator();
-  private MergingReceiverCreator mrc = new MergingReceiverCreator();
-  private RandomReceiverCreator rrc = new RandomReceiverCreator();
-  private PartitionSenderCreator hsc = new PartitionSenderCreator();
-  private OrderedPartitionSenderCreator opsc = new OrderedPartitionSenderCreator();
-  private SingleSenderCreator ssc = new SingleSenderCreator();
-  private ProjectBatchCreator pbc = new ProjectBatchCreator();
-  private FilterBatchCreator fbc = new FilterBatchCreator();
-  private LimitBatchCreator lbc = new LimitBatchCreator();
-  private UnionBatchCreator unionbc = new UnionBatchCreator();
-  private SVRemoverCreator svc = new SVRemoverCreator();
-  private SortBatchCreator sbc = new SortBatchCreator();
-  private AggBatchCreator abc = new AggBatchCreator();
-  private MergeJoinCreator mjc = new MergeJoinCreator();
-  private IteratorValidatorCreator ivc = new IteratorValidatorCreator();
   private RootExec root = null;
-  private TraceBatchCreator tbc = new TraceBatchCreator();
 
   private ImplCreator(){}
 
-  public RootExec getRoot(){
+  private RootExec getRoot(){
     return root;
   }
 
   @Override
-  public RecordBatch visitProject(Project op, FragmentContext context) throws ExecutionSetupException {
-    return pbc.getBatch(context, op, getChildren(op, context));
-  }
-
-  @Override
-  public RecordBatch visitTrace(Trace op, FragmentContext context) throws ExecutionSetupException {
-    return tbc.getBatch(context, op, getChildren(op, context));
-  }
-  @Override
-  public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
-    Preconditions.checkNotNull(subScan);
+  @SuppressWarnings("unchecked")
+  public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkNotNull(op);
     Preconditions.checkNotNull(context);
 
-    if (subScan instanceof MockSubScanPOP) {
-      return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
-    } else if (subScan instanceof JSONSubScan) {
-      return new JSONScanBatchCreator().getBatch(context, (JSONSubScan) subScan, Collections.<RecordBatch> emptyList());
-    } else if (subScan instanceof ParquetRowGroupScan) {
-      return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
-    } else {
-      return super.visitSubScan(subScan, context);
-    }
-
-  }
-
-  @Override
-  public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    if (op instanceof SelectionVectorRemover) {
-      return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
+    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(op.getClass());
+    if (opCreator != null) {
+      if (op instanceof FragmentRoot ) {
+        root = ((RootCreator<PhysicalOperator>)opCreator).getRoot(context, op, getChildren(op, context));
+        return null;
+      } else {
+        return ((BatchCreator<PhysicalOperator>)opCreator).getBatch(context, op, getChildren(op, context));
+      }
     } else {
-      return super.visitOp(op, context);
+      throw new UnsupportedOperationException(String.format(
+          "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.",
+          this.getClass().getCanonicalName(), op.getClass().getCanonicalName()));
     }
   }
 
-  @Override
-  public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
-    return sbc.getBatch(context, sort, getChildren(sort, context));
-  }
-
-  @Override
-  public RecordBatch visitLimit(Limit limit, FragmentContext context) throws ExecutionSetupException {
-    return lbc.getBatch(context, limit, getChildren(limit, context));
-  }
-
-  @Override
-  public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
-    return mjc.getBatch(context, op, getChildren(op, context));
-  }
-
-  @Override
-  public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
-    Preconditions.checkArgument(root == null);
-    root = sc.getRoot(context, op, getChildren(op, context));
-    return null;
-  }
-
-  @Override
-  public RecordBatch visitHashPartitionSender(HashPartitionSender op, FragmentContext context) throws ExecutionSetupException {
-    root = hsc.getRoot(context, op, getChildren(op, context));
-    return null;
-  }
-
-  @Override
-  public RecordBatch visitOrderedPartitionSender(OrderedPartitionSender op, FragmentContext context) throws ExecutionSetupException {
-    root = opsc.getRoot(context, op, getChildren(op, context));
-    return null;
-  }
-  
-  @Override
-  public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
-    return fbc.getBatch(context, filter, getChildren(filter, context));
-  }
-
-
-  @Override
-  public RecordBatch visitStreamingAggregate(StreamingAggregate config, FragmentContext context)
-      throws ExecutionSetupException {
-    return abc.getBatch(context, config, getChildren(config, context));
-  }
-
-  @Override
-  public RecordBatch visitUnion(Union union, FragmentContext context) throws ExecutionSetupException {
-    return unionbc.getBatch(context, union, getChildren(union, context));
-  }
-
-  @Override
-  public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
-    root = ssc.getRoot(context, op, getChildren(op, context));
-    return null;
-  }
-
-  @Override
-  public RecordBatch visitRandomReceiver(RandomReceiver op, FragmentContext context) throws ExecutionSetupException {
-    return rrc.getBatch(context, op, null);
-  }
-
-  @Override
-  public RecordBatch visitMergingReceiver(MergingReceiverPOP op, FragmentContext context) throws ExecutionSetupException {
-    return mrc.getBatch(context, op, null);
-  }
-
   private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
     List<RecordBatch> children = Lists.newArrayList();
     for (PhysicalOperator child : op) {
@@ -209,11 +73,6 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     return children;
   }
 
-  @Override
-  public RecordBatch visitIteratorValidator(IteratorValidator op, FragmentContext context) throws ExecutionSetupException {
-    return ivc.getBatch(context, op, getChildren(op, context));
-  }
-  
   public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     ImplCreator i = new ImplCreator();
     boolean isAssertEnabled = false;
@@ -227,6 +86,4 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
           "The provided fragment did not have a root node that correctly created a RootExec value.");
     return i.getRoot();
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
new file mode 100644
index 0000000..8c768e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.PathScanner;
+
+public class OperatorCreatorRegistry {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCreatorRegistry.class);
+
+  private volatile Map<Class<?>, Constructor<?>> constructorRegistry = new HashMap<Class<?>, Constructor<?>>();
+  private volatile Map<Class<?>, Object> instanceRegistry = new HashMap<Class<?>, Object>();
+
+  public OperatorCreatorRegistry(DrillConfig config) {
+    addImplemntorsToMap(config, BatchCreator.class);
+    addImplemntorsToMap(config, RootCreator.class);
+    logger.debug("Adding Operator Creator map: {}", constructorRegistry);
+  }
+
+  public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException {
+    Object opCreator = instanceRegistry.get(operator);
+    if (opCreator != null) return opCreator;
+
+    Constructor<?> c = constructorRegistry.get(operator);
+    if(c == null) {
+      throw new ExecutionSetupException(
+          String.format("Failure finding OperatorCreator constructor for config %s", operator.getCanonicalName()));
+    }
+    try {
+      opCreator = c.newInstance();
+      instanceRegistry.put(operator, opCreator);
+      return opCreator;
+    } catch (Throwable t) {
+      throw ExecutionSetupException.fromThrowable(
+          String.format("Failure creating OperatorCreator for Operator %s", operator.getCanonicalName()), t);
+    }
+  }
+
+  private <T> void addImplemntorsToMap(DrillConfig config, Class<T> baseInterface) {
+    Class<?>[] providerClasses = PathScanner.scanForImplementationsArr(baseInterface,
+        config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
+    for (Class<?> c : providerClasses) {
+      Class<?> operatorClass = c;
+      boolean interfaceFound = false;
+      while (!interfaceFound && !(c.equals(java.lang.Object.class))) {
+        Type[] ifaces = c.getGenericInterfaces(); // never returns null
+        for (Type iface : ifaces) {
+          if (!(iface instanceof ParameterizedType
+              && ((ParameterizedType)iface).getRawType().equals(baseInterface))) {
+            continue;
+          }
+          Type[] args = ((ParameterizedType)iface).getActualTypeArguments();
+          interfaceFound = true;
+          boolean constructorFound = false;
+          for(Constructor<?> constructor : operatorClass.getConstructors()){
+            Class<?>[] params = constructor.getParameterTypes();
+            if(params.length == 0){
+              Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor);
+              if (old != null) {
+                throw new RuntimeException(
+                    String.format("Duplicate OperatorCreator [%s, %s] found for PhysicalOperator %s",
+                    old.getDeclaringClass().getCanonicalName(), operatorClass.getCanonicalName(),
+                    ((Class<?>) args[0]).getCanonicalName()));
+              }
+              constructorFound = true;
+            }
+          }
+          if(!constructorFound){
+            logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor",
+                operatorClass.getCanonicalName());
+          }
+        }
+        c = c.getSuperclass();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 080fd70..4656590 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -24,21 +24,18 @@ import java.util.Collection;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.StorageEngineRegistry;
 
-import com.google.common.base.Preconditions;
 import com.codahale.metrics.MetricRegistry;
-
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import com.google.common.base.Preconditions;
 
 public class DrillbitContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -51,6 +48,7 @@ public class DrillbitContext {
   private final DistributedCache cache;
   private final DrillbitEndpoint endpoint;
   private final StorageEngineRegistry storageEngineRegistry;
+  private final OperatorCreatorRegistry operatorCreatorRegistry;
   
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
     super();
@@ -65,6 +63,7 @@ public class DrillbitContext {
     this.endpoint = endpoint;
     this.storageEngineRegistry = new StorageEngineRegistry(this);
     this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
+    this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
   }
   
   public DrillbitEndpoint getEndpoint(){
@@ -82,7 +81,11 @@ public class DrillbitContext {
   public BufferAllocator getAllocator(){
     return context.getAllocator();
   }
-  
+
+  public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+    return operatorCreatorRegistry;
+  }
+
   public StorageEngine getStorageEngine(StorageEngineConfig config) throws ExecutionSetupException {
     return storageEngineRegistry.getEngine(config);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index a78ffc3..0ecab3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -27,24 +27,21 @@ import mockit.NonStrictExpectations;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecConstants;
-
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
-
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
@@ -71,6 +68,7 @@ public class DumpCatTest {
           bitContext.getMetrics(); result = new MetricRegistry();
           bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
           bitContext.getConfig(); result = c;
+          bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
       }};
 
       PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
index 3ec9492..aa67ba5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -59,6 +60,7 @@ public class TestRepeatedFunction {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 54bf0fd..4dea33d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -56,6 +56,7 @@ public class TestComparisonFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index 1776d8d..411f21c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -59,6 +59,7 @@ public class TestSimpleFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -86,6 +87,7 @@ public class TestSimpleFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -113,6 +115,7 @@ public class TestSimpleFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -152,6 +155,7 @@ public class TestSimpleFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -191,6 +195,7 @@ public class TestSimpleFunctions {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
index b18ef71..4eae66f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -54,6 +55,7 @@ public class TestAgg {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index 14d68f3..d1c756f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -56,6 +57,7 @@ public class TestSimpleFilter {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     
@@ -80,6 +82,7 @@ public class TestSimpleFilter {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 09b7ebe..41cb034 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -63,6 +64,7 @@ public class TestMergeJoin {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -112,6 +114,7 @@ public class TestMergeJoin {
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
       bitContext.getConfig(); result = c;
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
@@ -164,6 +167,7 @@ public class TestMergeJoin {
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
       bitContext.getConfig(); result = c;
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
@@ -216,6 +220,7 @@ public class TestMergeJoin {
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
       bitContext.getConfig(); result = c;
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 89d909d..b254fc0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -20,9 +20,11 @@ package org.apache.drill.exec.physical.impl.limit;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import com.codahale.metrics.MetricRegistry;
+
 import junit.framework.Assert;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -30,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -50,6 +53,7 @@ public class TestSimpleLimit {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     verifyLimitCount(bitContext, connection, "test1.json", 5);
@@ -60,6 +64,7 @@ public class TestSimpleLimit {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     verifyLimitCount(bitContext, connection, "test3.json", 95);
@@ -70,6 +75,7 @@ public class TestSimpleLimit {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
 
     verifyLimitCount(bitContext, connection, "test2.json", 69999);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index ca38d9c..72dbe6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -59,6 +60,7 @@ public class TestSimpleProjection {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
index 426aa3a..4472668 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -58,6 +59,7 @@ public class TestSimpleSort {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     
@@ -105,6 +107,7 @@ public class TestSimpleSort {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index 2da96d7..5d2b67c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -58,6 +59,7 @@ public class TestSVRemover {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index dd7d006..a3d923e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.trace;
 
 import static org.junit.Assert.*;
-
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
@@ -30,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -37,7 +37,6 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
-
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
@@ -65,6 +64,7 @@ public class TestTraceMultiRecordBatch {
             bitContext.getMetrics(); result = new MetricRegistry();
             bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
             bitContext.getConfig(); result = c;
+            bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
         }};
 
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index f4e6180..7643cf8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.trace;
 
 import static org.junit.Assert.*;
-
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
@@ -31,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -40,7 +40,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -79,6 +78,7 @@ public class TestTraceOutputDump {
             bitContext.getMetrics(); result = new MetricRegistry();
             bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
             bitContext.getConfig(); result = c;
+            bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
         }};
 
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index f74c66f..fe90ad4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.physical.impl.union;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import com.codahale.metrics.MetricRegistry;
+
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -29,6 +31,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -54,6 +57,7 @@ public class TestSimpleUnion {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry();
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
     }};
     
     


[02/10] git commit: DRILL-311: Replace OrderedPartitionBatchCreator with OrderedPartitionSenderCreator

Posted by ja...@apache.org.
DRILL-311: Replace OrderedPartitionBatchCreator with OrderedPartitionSenderCreator


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ba5e6520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ba5e6520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ba5e6520

Branch: refs/heads/master
Commit: ba5e65207bac38519bc199ed95535932abab2908
Parents: 622aad0
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:35:46 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:35:46 2013 -0800

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ImplCreator.java   | 27 +++++++----
 .../OrderedPartitionBatchCreator.java           | 39 ----------------
 .../OrderedPartitionSenderCreator.java          | 47 ++++++++++++++++++++
 .../PartitionSenderRootExec.java                |  2 +-
 4 files changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index efc0f5b..3e4c1eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,12 +26,27 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
 import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionBatchCreator;
+import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -64,10 +79,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private MergingReceiverCreator mrc = new MergingReceiverCreator();
   private RandomReceiverCreator rrc = new RandomReceiverCreator();
   private PartitionSenderCreator hsc = new PartitionSenderCreator();
-  private OrderedPartitionBatchCreator opc = new OrderedPartitionBatchCreator();
+  private OrderedPartitionSenderCreator opsc = new OrderedPartitionSenderCreator();
   private SingleSenderCreator ssc = new SingleSenderCreator();
   private ProjectBatchCreator pbc = new ProjectBatchCreator();
-  private OrderedPartitionBatchCreator smplbc = new OrderedPartitionBatchCreator();
   private FilterBatchCreator fbc = new FilterBatchCreator();
   private LimitBatchCreator lbc = new LimitBatchCreator();
   private UnionBatchCreator unionbc = new UnionBatchCreator();
@@ -150,10 +164,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
 
   @Override
   public RecordBatch visitOrderedPartitionSender(OrderedPartitionSender op, FragmentContext context) throws ExecutionSetupException {
-    List<RecordBatch> children = Lists.newArrayList();
-    children.add(opc.getBatch(context, op, getChildren(op, context)));
-    HashPartitionSender config = new HashPartitionSender(op.getOppositeMajorFragmentId(), op, op.getRef(),op.getDestinations());
-    root = hsc.getRoot(context, config, children);
+    root = opsc.getRoot(context, op, getChildren(op, context));
     return null;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
deleted file mode 100644
index 615cf21..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.orderedpartitioner;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.OrderedPartitionSender;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import java.util.List;
-
-public class OrderedPartitionBatchCreator implements BatchCreator<OrderedPartitionSender>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionBatchCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.size() == 1);
-    return new OrderedPartitionRecordBatch(config, children.iterator().next(), context);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
new file mode 100644
index 0000000..c0ba8f9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.orderedpartitioner;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.impl.RootCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> {
+
+  @Override
+  public RootExec getRoot(FragmentContext context, OrderedPartitionSender config,
+      List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+
+    List<RecordBatch> ordered_children = Lists.newArrayList();
+    ordered_children.add(new OrderedPartitionRecordBatch(config, children.iterator().next(), context));
+    HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations());
+    return new PartitionSenderRootExec(context, ordered_children.iterator().next(), hpc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 19adee7..bc53bd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -45,7 +45,7 @@ import com.sun.codemodel.JMod;
 import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
-class PartitionSenderRootExec implements RootExec {
+public class PartitionSenderRootExec implements RootExec {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private RecordBatch incoming;


[05/10] DRILL-274: Spooling batch buffer

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json b/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
new file mode 100644
index 0000000..143331d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
@@ -0,0 +1,48 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+              {records: 1000000, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "red", type: "BIGINT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "union-exchange"
+        },
+        {
+            @id: 3,
+            child: 2,
+            pop: "union-exchange"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "union-exchange"
+        },
+        {
+            @id: 5,
+            child: 4,
+            pop: "filter",
+            expr: "red < 9000000000000000000"
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file


[10/10] git commit: DRILL-293: Sort operator throws NPE on empty record batch

Posted by ja...@apache.org.
DRILL-293: Sort operator throws NPE on empty record batch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4a83dae3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4a83dae3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4a83dae3

Branch: refs/heads/master
Commit: 4a83dae35f8782a9737b31723769724f3b274894
Parents: 366bf8e
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:11:27 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:11:27 2013 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/physical/impl/sort/SortBatch.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a83dae3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 9cefce1..7881115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -134,10 +134,11 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
         }
       }
       
-      if (builder == null)
+      if (builder == null){
         // builder may be null at this point if the first incoming batch is empty
         return IterOutcome.NONE;
-
+      }
+        
       builder.build(context);
       sv4 = builder.getSv4();
 


[07/10] git commit: DRILL-303: RecordBatchLoader.load always creates new schema

Posted by ja...@apache.org.
DRILL-303: RecordBatchLoader.load always creates new schema


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b12c0b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b12c0b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b12c0b15

Branch: refs/heads/master
Commit: b12c0b155698a3d2ecd5cf3bfdf994fccd65f8d6
Parents: 2c811a8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:06:05 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:06:05 2013 -0800

----------------------------------------------------------------------
 .../drill/exec/record/RecordBatchLoader.java    | 17 +++---
 .../exec/physical/impl/TestUnionExchange.java   | 62 ++++++++++++++++++++
 .../test/resources/sender/union_exchange.json   | 47 +++++++++++++++
 3 files changed, 116 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 016f340..f19184f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -62,10 +62,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     this.valueCount = def.getRecordCount();
     boolean schemaChanged = schema == null;
 
-    Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
+    Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
     for(VectorWrapper<?> w : container){
       ValueVector v = w.getValueVector();
-      oldFields.put(v.getField(), v);
+      oldFields.put(v.getField().getDef(), v);
     }
     
     VectorContainer newVectors = new VectorContainer();
@@ -76,15 +76,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     for (FieldMetadata fmd : fields) {
       FieldDef fieldDef = fmd.getDef();
       ValueVector v = oldFields.remove(fieldDef);
-      if(v != null){
-        container.add(v);
-        continue;
+      if(v == null) {
+        // if we arrive here, we didn't have a matching vector.
+        schemaChanged = true;
+        MaterializedField m = new MaterializedField(fieldDef);
+        v = TypeHelper.getNewVector(m, allocator);
       }
-
-      // if we arrive here, we didn't have a matching vector.
-      schemaChanged = true;
-      MaterializedField m = new MaterializedField(fieldDef);
-      v = TypeHelper.getNewVector(m, allocator);
       if (fmd.getValueCount() == 0){
         v.clear();
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
new file mode 100644
index 0000000..2e16b47
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestUnionExchange extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionExchange.class);
+
+  @Test
+  public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+          Files.toString(FileUtils.getResourceAsFile("/sender/union_exchange.json"),
+              Charsets.UTF_8));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(150, count);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/resources/sender/union_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/union_exchange.json b/exec/java-exec/src/test/resources/sender/union_exchange.json
new file mode 100644
index 0000000..76963bd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/union_exchange.json
@@ -0,0 +1,47 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+              {records: 100, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "red", type: "BIGINT", mode: "REQUIRED"}
+              ]},
+              {records: 200, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "red", type: "BIGINT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "union-exchange"
+        },
+        {
+             @id: 3,
+             child: 2,
+             pop: "filter",
+             expr: "alternate()"
+         },
+         {
+             @id: 4,
+             child: 3,
+             pop:"selection-vector-remover"
+         },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file


[08/10] git commit: DRILL-301: Join two tables hit IndexOutOfBoundsException

Posted by ja...@apache.org.
DRILL-301: Join two tables hit IndexOutOfBoundsException


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/316ce8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/316ce8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/316ce8a6

Branch: refs/heads/master
Commit: 316ce8a6f8f94c31574e7107be26addcfc92dc7f
Parents: b12c0b1
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:07:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:07:45 2013 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinTemplate.java   | 28 +++++++++++---------
 .../exec/physical/impl/join/JoinWorker.java     |  2 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 20 ++++++++------
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index b7fdbf3..aae1a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -83,8 +83,9 @@ public abstract class JoinTemplate implements JoinWorker {
   /**
    * Copy rows from the input record batches until the output record batch is full
    * @param status  State of the join operation (persists across multiple record batches/schema changes)
+   * @return  true of join succeeded; false if the worker needs to be regenerated
    */
-  public final void doJoin(final JoinStatus status) {
+  public final boolean doJoin(final JoinStatus status) {
     while (true) {
       // for each record
 
@@ -93,14 +94,15 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining values from the left batch
           while (status.isLeftPositionAllowed()) {
-            doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+            if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
+              return false;
             status.advanceLeft();
           }
         }
-        return;
+        return true;
       }
       if (!status.isLeftPositionAllowed())
-        return;
+        return true;
 
       int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
       switch (comparison) {
@@ -108,7 +110,9 @@ public abstract class JoinTemplate implements JoinWorker {
       case -1:
         // left key < right key
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
-          doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+          if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
+            return false;
+          }
         status.advanceLeft();
         continue;
 
@@ -133,10 +137,10 @@ public abstract class JoinTemplate implements JoinWorker {
         do {
           // copy all equal right keys to the output record batch
           if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
-            return;
+            return false;
 
           if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
-            return;
+            return false;
           
           // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
           // right table's record batch to the sv4 builder before calling next.  These records will need to be
@@ -167,7 +171,7 @@ public abstract class JoinTemplate implements JoinWorker {
             status.ok = false;
           }
           // return to indicate recompile in right-sv4 mode
-          return;
+          return true;
         }
 
         continue;
@@ -193,8 +197,8 @@ public abstract class JoinTemplate implements JoinWorker {
   /**
    * Copy the data to the new record batch (if it fits).
    *
-   * @param leftPosition  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
-   * @param outputPosition position of the output record batch
+   * @param leftIndex  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
+   * @param outIndex position of the output record batch
    * @return Whether or not the data was copied.
    */
   public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
@@ -205,8 +209,8 @@ public abstract class JoinTemplate implements JoinWorker {
    * Compare the values of the left and right join key to determine whether the left is less than, greater than
    * or equal to the right.
    *
-   * @param leftPosition
-   * @param rightPosition
+   * @param leftIndex
+   * @param rightIndex
    * @return  0 if both keys are equal
    *         -1 if left is < right
    *          1 if left is > right

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 4374cef..8643d66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -30,7 +30,7 @@ public interface JoinWorker {
   }
 
   public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
-  public void doJoin(JoinStatus status);
+  public boolean doJoin(JoinStatus status);
   
   public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1e20e91..f3a32cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -166,7 +166,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       }
 
       // join until we have a complete outgoing batch
-      worker.doJoin(status);
+      if (!worker.doJoin(status))
+        worker = null;
 
       // get the outcome of the join.
       switch(status.getOutcome()){
@@ -353,10 +354,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                        new TypedFieldId(vw.getField().getType(),vectorId));
       // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
+      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
                                    .arg(COPY_LEFT_MAPPING.getValueReadIndex())
                                    .arg(COPY_LEFT_MAPPING.getValueWriteIndex())
-                                   .arg(vvIn));
+                                   .arg(vvIn).eq(JExpr.FALSE))
+          ._then()
+          ._return(JExpr.FALSE);
       ++vectorId;
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
@@ -372,10 +375,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                        new TypedFieldId(vw.getField().getType(),vectorId));
       // todo: check result of copyFromSafe and grow allocation
-      cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
-          .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
-          .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
-          .arg(vvIn));
+      cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+                                 .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+                                 .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+                                 .arg(vvIn).eq(JExpr.FALSE))
+          ._then()
+          ._return(JExpr.FALSE);
       ++vectorId;
     }
     cg.getEvalBlock()._return(JExpr.lit(true));
@@ -388,7 +393,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private void allocateBatch() {
     // allocate new batch space.
     container.clear();
-
     // add fields from both batches
     for (VectorWrapper<?> w : left) {
       ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());


[06/10] git commit: DRILL-274: Spooling batch buffer

Posted by ja...@apache.org.
DRILL-274: Spooling batch buffer


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2c811a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2c811a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2c811a83

Branch: refs/heads/master
Commit: 2c811a83b30295fb39e1540ce76fbf54768ed50c
Parents: 6c0389f
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:04:44 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:04:44 2013 -0800

----------------------------------------------------------------------
 distribution/src/resources/drill-override.conf  |  35 ++-
 .../org/apache/drill/exec/ExecConstants.java    |   5 +
 .../apache/drill/exec/ops/FragmentContext.java  |  10 +-
 .../exec/physical/impl/WireRecordBatch.java     |  15 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  15 +-
 .../exec/record/RawFragmentBatchProvider.java   |   4 +-
 .../exec/store/LocalSyncableFileSystem.java     | 183 ++++++++++++++
 .../work/batch/AbstractFragmentCollector.java   |  22 +-
 .../drill/exec/work/batch/BatchCollector.java   |   4 +-
 .../exec/work/batch/BitComHandlerImpl.java      |   2 +-
 .../drill/exec/work/batch/IncomingBuffers.java  |  24 +-
 .../drill/exec/work/batch/MergingCollector.java |   5 +-
 .../exec/work/batch/PartitionedCollector.java   |   5 +-
 .../drill/exec/work/batch/RawBatchBuffer.java   |   4 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java | 250 +++++++++++++++++++
 .../work/batch/UnlimitedRawBatchBuffer.java     |   4 +
 .../work/foreman/RunningFragmentManager.java    |   6 +-
 .../work/fragment/RemoteFragmentHandler.java    |   5 +-
 .../src/main/resources/drill-module.conf        |  11 +-
 .../apache/drill/exec/client/DumpCatTest.java   |   2 +-
 .../exec/fn/impl/TestRepeatedFunction.java      |   2 +-
 .../physical/impl/TestComparisonFunctions.java  |   2 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   4 +-
 .../exec/physical/impl/TestSimpleFunctions.java |  10 +-
 .../drill/exec/physical/impl/agg/TestAgg.java   |   2 +-
 .../physical/impl/filter/TestSimpleFilter.java  |   4 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |   8 +-
 .../physical/impl/limit/TestSimpleLimit.java    |   4 +-
 .../impl/project/TestSimpleProjection.java      |   2 +-
 .../exec/physical/impl/sort/TestSimpleSort.java |   4 +-
 .../physical/impl/svremover/TestSVRemover.java  |   2 +-
 .../impl/trace/TestTraceMultiRecordBatch.java   |   2 +-
 .../impl/trace/TestTraceOutputDump.java         |   2 +-
 .../physical/impl/union/TestSimpleUnion.java    |   2 +-
 .../apache/drill/exec/work/batch/FileTest.java  |  66 +++++
 .../exec/work/batch/TestSpoolingBuffer.java     |  62 +++++
 .../src/test/resources/drill-module.conf        |   4 +-
 .../test/resources/drill-spool-test-module.conf |  83 ++++++
 .../resources/work/batch/multiple_exchange.json |  48 ++++
 39 files changed, 851 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index 18a2a6a..5fe2362 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -16,6 +16,9 @@
 //  This file tells Drill to consider this module when class path scanning.  
 //  This file can also include any supplementary configuration information.  
 //  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
@@ -46,11 +49,19 @@ drill.exec: {
   optimizer: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
   },
+  functions: ["org.apache.drill.expr.fn.impl"],
   storage: {
-	packages += "org.apache.drill.exec.store"
-  }
-  metrics : {
-    context: "drillbit"
+    packages += "org.apache.drill.exec.store"  
+  },
+  metrics : { 
+    context: "drillbit",
+    jmx: {
+      enabled : true
+    },
+    log: {
+      enabled : false,
+      interval : 60
+    }
   },
   zk: {
 	connect: "localhost:2181",
@@ -60,7 +71,7 @@ drill.exec: {
   	retry: {
   	  count: 7200,
   	  delay: 500
-  	}
+  	}    
   },
   functions: ["org.apache.drill.expr.fn.impl"],
   network: {
@@ -70,8 +81,18 @@ drill.exec: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
     executor.threads: 4
-  }
+  },
   trace: {
-    directory: "/tmp"
+    directory: "/var/log/drill",
+    filesystem: "file:///"
+  },
+  tmp: {
+    directories: ["/tmp/drill"],
+    filesystem: "drill-local:///"
+  },
+  spooling: {
+    impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+    delete: false,
+    size: 100000000
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 36504f6..5336c0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -45,4 +45,9 @@ public interface ExecConstants {
   public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
   public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
   public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
+  public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
+  public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
+  public static final String SPOOLING_BUFFER_IMPL = "drill.exec.spooling.impl";
+  public static final String SPOOLING_BUFFER_DELETE = "drill.exec.spooling.delete";
+  public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.spooling.size";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 674dafc..f75cf5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
@@ -57,14 +58,14 @@ public class FragmentContext {
   public final Timer fragmentTime;
   private final FragmentHandle handle;
   private final UserClientConnection connection;
-  private final IncomingBuffers buffers;
+  private IncomingBuffers buffers;
   private volatile Throwable failureCause;
   private volatile boolean failed = false;
   private final FunctionImplementationRegistry funcRegistry;
   private final QueryClassLoader loader;
   private final ClassTransformer transformer;
   
-  public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers, FunctionImplementationRegistry funcRegistry) {
+  public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) {
     this.loader = new QueryClassLoader(true);
     this.transformer = new ClassTransformer();
     this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
@@ -74,10 +75,13 @@ public class FragmentContext {
     this.context = dbContext;
     this.connection = connection;
     this.handle = handle;
-    this.buffers = buffers;
     this.funcRegistry = funcRegistry;
   }
 
+  public void setBuffers(IncomingBuffers buffers) {
+    this.buffers = buffers;
+  }
+
   public void fail(Throwable cause) {
     logger.debug("Fragment Context received failure. {}", cause);
     failed = true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 0b0214a..6a16367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
@@ -96,14 +97,14 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public IterOutcome next() {
-    RawFragmentBatch batch = fragProvider.getNext();
+    try{
+      RawFragmentBatch batch = fragProvider.getNext();
     
-    // skip over empty batches. we do this since these are basically control messages.
-    while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
-      batch = fragProvider.getNext();
-    }
+      // skip over empty batches. we do this since these are basically control messages.
+      while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
+        batch = fragProvider.getNext();
+      }
     
-    try{
       if (batch == null) return IterOutcome.NONE;
       
 
@@ -119,7 +120,7 @@ public class WireRecordBatch implements RecordBatch{
       }else{
         return IterOutcome.OK;
       }
-    }catch(SchemaChangeException ex){
+    }catch(SchemaChangeException | IOException ex){
       context.fail(ex);
       return IterOutcome.STOP;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 6d8a284..fd392a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -132,7 +132,13 @@ public class MergingRecordBatch implements RecordBatch {
       // set up each (non-empty) incoming record batch
       List<RawFragmentBatch> rawBatches = Lists.newArrayList();
       for (RawFragmentBatchProvider provider : fragProviders) {
-        RawFragmentBatch rawBatch = provider.getNext();
+        RawFragmentBatch rawBatch = null;
+        try {
+          rawBatch = provider.getNext();
+        } catch (IOException e) {
+          context.fail(e);
+          return IterOutcome.STOP;
+        }
         if (rawBatch.getHeader().getDef().getRecordCount() != 0)
           rawBatches.add(rawBatch);
       }
@@ -226,7 +232,12 @@ public class MergingRecordBatch implements RecordBatch {
 
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
-        incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+        try {
+          incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+        } catch (IOException e) {
+          context.fail(e);
+          return IterOutcome.STOP;
+        }
 
         if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
             incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index 3390af9..6f5f7a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -19,9 +19,11 @@ package org.apache.drill.exec.record;
 
 import org.apache.drill.exec.ops.FragmentContext;
 
+import java.io.IOException;
+
 public interface RawFragmentBatchProvider {
   
-  public RawFragmentBatch getNext();
+  public RawFragmentBatch getNext() throws IOException;
   public void kill(FragmentContext context);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
new file mode 100644
index 0000000..10a4dc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class provides a Syncable local extension of the hadoop FileSystem
+ */
+public class LocalSyncableFileSystem extends FileSystem {
+
+  @Override
+  public URI getUri() {
+    try {
+      return new URI("drill-local:///");
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int i) throws IOException {
+    return new FSDataInputStream(new LocalInputStream(path));
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i2, long l, Progressable progressable) throws IOException {
+    return new FSDataOutputStream(new LocalSyncableOutputStream(path));
+  }
+
+  @Override
+  public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
+    throw new IOException("Append is not supported in LocalSyncableFilesystem");
+  }
+
+  @Override
+  public boolean rename(Path path, Path path2) throws IOException {
+    throw new IOException("Rename not supported");
+  }
+
+  @Override
+  public boolean delete(Path path) throws IOException {
+    File file = new File(path.toString());
+    return file.delete();
+  }
+
+  @Override
+  public boolean delete(Path path, boolean b) throws IOException {
+    File file = new File(path.toString());
+    if (b) {
+      if (file.isDirectory()) {
+        FileUtils.deleteDirectory(file);
+      } else {
+        file.delete();
+      }
+    } else if (file.isDirectory()) {
+      throw new IOException("Cannot delete directory");
+    }
+    file.delete();
+    return true;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    throw new IOException("listStatus not supported");
+  }
+
+  @Override
+  public void setWorkingDirectory(Path path) {
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return null;
+  }
+
+  @Override
+  public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+    return new File(path.toString()).mkdirs();
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    return null;
+  }
+
+  public class LocalSyncableOutputStream extends OutputStream implements Syncable {
+    private FileOutputStream fos;
+    private BufferedOutputStream output;
+
+    public LocalSyncableOutputStream(Path path) throws FileNotFoundException {
+      File dir = new File(path.getParent().toString());
+      if (!dir.exists()) {
+        boolean success = dir.mkdirs();
+        if (!success) {
+          throw new FileNotFoundException("failed to create parent directory");
+        }
+      }
+      fos = new FileOutputStream(new File(path.toString()));
+      output = new BufferedOutputStream(fos, 64*1024);
+    }
+
+    @Override
+    public void sync() throws IOException {
+      output.flush();
+      fos.getFD().sync();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      output.write(b);
+    }
+  }
+
+  public class LocalInputStream extends InputStream implements Seekable, PositionedReadable {
+
+    private FileInputStream input;
+
+    public LocalInputStream(Path path)  throws IOException {
+      input = new FileInputStream(path.toString());
+    }
+
+    @Override
+    public int read(long l, byte[] bytes, int i, int i2) throws IOException {
+      throw new IOException("unsupported operation");
+    }
+
+    @Override
+    public void readFully(long l, byte[] bytes, int i, int i2) throws IOException {
+      throw new IOException("unsupported operation");
+    }
+
+    @Override
+    public void readFully(long l, byte[] bytes) throws IOException {
+      throw new IOException("unsupported operation");
+    }
+
+    @Override
+    public void seek(long l) throws IOException {
+      input.reset();
+      input.skip(l);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      throw new IOException("getPos not supported");
+    }
+
+    @Override
+    public boolean seekToNewSource(long l) throws IOException {
+      throw new IOException("seekToNewSource not supported");
+    }
+
+    @Override
+    public int read() throws IOException {
+      byte[] b = new byte[1];
+      input.read(b);
+      return (int) b[0] & 0xFF;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index d58de2f..7023373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -17,10 +17,17 @@
  */
 package org.apache.drill.exec.work.batch;
 
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -38,7 +45,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
   private final AtomicInteger parentAccounter;
   private final AtomicInteger finishedStreams = new AtomicInteger();
   
-  public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired) {
+  public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) {
     Preconditions.checkArgument(minInputsRequired > 0);
     Preconditions.checkNotNull(receiver);
     Preconditions.checkNotNull(parentAccounter);
@@ -48,8 +55,15 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
     this.remainders = new AtomicIntegerArray(incoming.size());
     this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
     this.buffers = new RawBatchBuffer[minInputsRequired];
-    for(int i = 0; i < buffers.length; i++){
-      buffers[i] = new UnlimitedRawBatchBuffer();
+    try {
+      String bufferClassName = context.getConfig().getString(ExecConstants.SPOOLING_BUFFER_IMPL);
+      Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class);
+      for(int i = 0; i < buffers.length; i++) {
+          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context);
+      }
+    } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+            NoSuchMethodException | ClassNotFoundException e) {
+      context.fail(e);
     }
     if (receiver.supportsOutOfOrderExchange()) {
       this.remainingRequired = new AtomicInteger(1);
@@ -68,7 +82,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
   
   public abstract void streamFinished(int minorFragmentId);
   
-  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch)  throws IOException {
     boolean decremented = false;
     if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
       int rem = remainingRequired.decrementAndGet();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index 236b239..539393c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -21,10 +21,12 @@ package org.apache.drill.exec.work.batch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
+import java.io.IOException;
+
 
 interface BatchCollector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
-  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) throws IOException ;
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 5639851..5d20026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -111,7 +111,7 @@ public class BitComHandlerImpl implements BitComHandler {
   @Override
   public void startNewRemoteFragment(PlanFragment fragment){
     logger.debug("Received remote fragment start instruction", fragment);
-    FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
+    FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
     BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
     RemoteFragmentRunnerListener listener = new RemoteFragmentRunnerListener(context, tunnel);
     try{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index c9e5608..992d9a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,11 +17,13 @@
  */
 package org.apache.drill.exec.work.batch;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Receiver;
@@ -41,12 +43,14 @@ public class IncomingBuffers {
   private final AtomicInteger streamsRemaining = new AtomicInteger(0);
   private final AtomicInteger remainingRequired = new AtomicInteger(0);
   private final Map<Integer, BatchCollector> fragCounts;
+  private final FragmentContext context;
 
-  public IncomingBuffers(PhysicalOperator root) {
+  public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
+    this.context = context;
     Map<Integer, BatchCollector> counts = Maps.newHashMap();
     CountRequiredFragments reqFrags = new CountRequiredFragments();
     root.accept(reqFrags, counts);
-    
+
     logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(), counts);
     fragCounts = ImmutableMap.copyOf(counts);
 
@@ -68,10 +72,14 @@ public class IncomingBuffers {
     int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
     BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
     if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting.  The id was %d.", sendMajorFragmentId));
-    boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
-    
-    // we should only return true if remaining required has been decremented and is currently equal to zero.
-    return decremented && remainingRequired.get() == 0;
+    try {
+      boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
+
+      // we should only return true if remaining required has been decremented and is currently equal to zero.
+      return decremented && remainingRequired.get() == 0;
+    } catch (IOException e) {
+      throw new FragmentSetupException(e);
+    }
   }
 
   public int getRemainingRequired() {
@@ -94,9 +102,9 @@ public class IncomingBuffers {
     public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
       BatchCollector set;
       if (receiver.supportsOutOfOrderExchange()) {
-        set = new MergingCollector(remainingRequired, receiver);
+        set = new MergingCollector(remainingRequired, receiver, context);
       } else {
-        set = new PartitionedCollector(remainingRequired, receiver);
+        set = new PartitionedCollector(remainingRequired, receiver, context);
       }
       
       counts.put(set.getOppositeMajorFragmentId(), set);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 670347c..1c92bbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -19,14 +19,15 @@ package org.apache.drill.exec.work.batch;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.Receiver;
 
 public class MergingCollector extends AbstractFragmentCollector{
 
   private AtomicInteger streamsRunning;
   
-  public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
-    super(parentAccounter, receiver, 1);
+  public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+    super(parentAccounter, receiver, 1, context);
     streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index af12778..f998eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -19,12 +19,13 @@ package org.apache.drill.exec.work.batch;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.Receiver;
 
 public class PartitionedCollector extends AbstractFragmentCollector{
 
-  public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver) {
-    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size());
+  public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 82ed1ca..e7d3d06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -21,10 +21,12 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
 import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
+import java.io.IOException;
+
 public interface RawBatchBuffer extends RawFragmentBatchProvider{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawBatchBuffer.class);
   
-  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch);
+  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException;
   
   /**
    * Inform the buffer that no more records are expected.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
new file mode 100644
index 0000000..fa20b3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Queues;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This implementation of RawBatchBuffer starts writing incoming batches to disk once the buffer size reaches a threshold.
+ * The order of the incoming buffers is maintained.
+ */
+public class SpoolingRawBatchBuffer implements RawBatchBuffer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
+
+  private static String HADOOP_FILESYSTEM_DEFAULT_NAME = "fs.default.name";
+  private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
+  private static final float STOP_SPOOLING_FRACTION = (float) 0.5;
+
+  private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
+  private volatile boolean finished = false;
+  private volatile long queueSize = 0;
+  private long threshold;
+  private FragmentContext context;
+  private volatile AtomicBoolean spooling = new AtomicBoolean(false);
+  private FileSystem fs;
+  private Path path;
+  private FSDataOutputStream outputStream;
+  private FSDataInputStream inputStream;
+
+  public SpoolingRawBatchBuffer(FragmentContext context) throws IOException {
+    this.context = context;
+    this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_FILESYSTEM_DEFAULT_NAME, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
+    conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
+    this.fs = FileSystem.get(conf);
+    this.path = new Path(getDir(), getFileName());
+  }
+
+  public static List<String> DIRS = DrillConfig.create().getStringList(ExecConstants.TEMP_DIRECTORIES);
+
+  public static String getDir() {
+    Random random = new Random();
+    return DIRS.get(random.nextInt(DIRS.size()));
+  }
+
+  @Override
+  public synchronized void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException {
+    RawFragmentBatchWrapper wrapper;
+    boolean spool = spooling.get();
+    wrapper = new RawFragmentBatchWrapper(batch, !spool);
+    queueSize += wrapper.getBodySize();
+    if (spool) {
+      if (outputStream == null) {
+        outputStream = fs.create(path);
+      }
+      wrapper.writeToStream(outputStream);
+    }
+    buffer.add(wrapper);
+    if (!spool && queueSize > threshold) {
+      logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", queueSize, threshold);
+      spooling.set(true);
+    }
+  }
+
+  @Override
+  public void kill(FragmentContext context) {
+    cleanup();
+  }
+
+  
+  @Override
+  public void finished() {
+    finished = true;
+  }
+
+  @Override
+  public RawFragmentBatch getNext() throws IOException {
+    boolean spool = spooling.get();
+    RawFragmentBatchWrapper w = buffer.poll();
+    RawFragmentBatch batch;
+    if(w == null && !finished){
+      try {
+        w = buffer.take();
+        batch = w.get();
+        queueSize -= w.getBodySize();
+        return batch;
+      } catch (InterruptedException e) {
+        cleanup();
+        return null;
+      }
+    }
+    if (w == null) {
+      cleanup();
+      return null;
+    }
+
+    batch = w.get();
+    queueSize -= w.getBodySize();
+    assert queueSize >= 0;
+    if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
+      logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
+      spooling.set(false);
+    }
+    return batch;
+  }
+
+  private void cleanup() {
+    try {
+      if (outputStream != null) {
+        outputStream.close();
+      }
+      if (inputStream != null) {
+        inputStream.close();
+      }
+    } catch (IOException e) {
+      logger.warn("Failed to cleanup I/O streams", e);
+    }
+    if (context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
+      try {
+        fs.delete(path,false);
+      } catch (IOException e) {
+        logger.warn("Failed to delete temporary files", e);
+      }
+      logger.debug("Deleted file {}", path.toString());
+    }
+  }
+
+  private class RawFragmentBatchWrapper {
+    private RawFragmentBatch batch;
+    private boolean available;
+    private CountDownLatch latch = new CountDownLatch(1);
+    private int bodyLength;
+
+    public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
+      Preconditions.checkNotNull(batch);
+      this.batch = batch;
+      this.available = available;
+    }
+
+    public boolean isNull() {
+      return batch == null;
+    }
+
+    public RawFragmentBatch get() throws IOException {
+      if (available) {
+        return batch;
+      } else {
+        if (inputStream == null) {
+          inputStream = fs.open(path);
+        }
+        readFromStream(inputStream);
+        available = true;
+        return batch;
+      }
+    }
+
+    public long getBodySize() {
+      if (batch.getBody() == null) {
+        return 0;
+      }
+      assert batch.getBody().readableBytes() >= 0;
+      return batch.getBody().readableBytes();
+    }
+
+    public void writeToStream(FSDataOutputStream stream) throws IOException {
+      Stopwatch watch = new Stopwatch();
+      watch.start();
+      available = false;
+      batch.getHeader().writeDelimitedTo(stream);
+      ByteBuf buf = batch.getBody();
+      if (buf == null) {
+        bodyLength = 0;
+        return;
+      }
+      bodyLength = buf.readableBytes();
+      buf.getBytes(0, stream, bodyLength);
+      stream.sync();
+      long t = watch.elapsed(TimeUnit.MICROSECONDS);
+      logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+      buf.release();
+    }
+
+    public void readFromStream(FSDataInputStream stream) throws IOException {
+      Stopwatch watch = new Stopwatch();
+      watch.start();
+      ExecProtos.FragmentRecordBatch header = ExecProtos.FragmentRecordBatch.parseDelimitedFrom(stream);
+      ByteBuf buf = context.getAllocator().buffer(bodyLength);
+      buf.writeBytes(stream, bodyLength);
+      batch = new RawFragmentBatch(header, buf);
+      available = true;
+      latch.countDown();
+      long t = watch.elapsed(TimeUnit.MICROSECONDS);
+      logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+    }
+  }
+
+  private String getFileName() {
+    ExecProtos.FragmentHandle handle = context.getHandle();
+
+    String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+    int majorFragmentId = handle.getMajorFragmentId();
+    int minorFragmentId = handle.getMinorFragmentId();
+
+    String fileName = String.format("%s_%s_%s", qid, majorFragmentId, minorFragmentId);
+
+    return fileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 43870da..64012c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -31,6 +31,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
   private volatile boolean finished = false;
+
+  public UnlimitedRawBatchBuffer(FragmentContext context) {
+
+  }
   
   @Override
   public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 588316b..2cb57dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -78,9 +78,9 @@ class RunningFragmentManager implements FragmentStatusListener{
 
     // set up the root fragment first so we'll have incoming buffers available.
     {
-      IncomingBuffers buffers = new IncomingBuffers(rootOperator);
-      
-      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+      IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+      rootContext.setBuffers(buffers);
       RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
       // add fragment to local node.
       map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index d947d68..6157229 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -56,8 +56,9 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
     try{
       this.fragment = fragment;
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      this.buffers = new IncomingBuffers(root);
-      this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
+      this.context = new FragmentContext(context, fragment.getHandle(), null, new FunctionImplementationRegistry(context.getConfig()));
+      this.buffers = new IncomingBuffers(root, this.context);
+      this.context.setBuffers(buffers);
       this.runnerListener = new RemoteFragmentRunnerListener(this.context, foremanTunnel);
       this.reader = context.getPlanReader();
       

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 725c6b4..b84c406 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -66,9 +66,18 @@ drill.exec: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
     executor.threads: 4
-  }
+  },
   trace: {
     directory: "/var/log/drill",
     filesystem: "file:///"
+  },
+  tmp: {
+    directories: ["/tmp/drill"],
+    filesystem: "drill-local:///"
+  },
+  spooling: {
+    impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+    delete: false,
+    size: 100000000
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index 0ecab3a..c06818d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -74,7 +74,7 @@ public class DumpCatTest {
       PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
       PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
       FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-      FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+      FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
       SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
       while(exec.next()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
index aa67ba5..8f56464 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
@@ -67,7 +67,7 @@ public class TestRepeatedFunction {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_repeated_1.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     
     boolean oneIsOne = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 4dea33d..ea0ac2c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -62,7 +62,7 @@ public class TestComparisonFunctions {
     String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression);
     if(reader == null) reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     if(registry == null) registry = new FunctionImplementationRegistry(c);
-    if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     PhysicalPlan plan = reader.readPhysicalPlan(planString);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 05d57be..99ab362 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -99,7 +99,7 @@ public class TestOptiqPlans {
 
     
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(fctxt, (FragmentRoot) pp.getSortedOperators(false).iterator().next()));
     return exec;
     
@@ -278,7 +278,7 @@ public class TestOptiqPlans {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), reg);
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     return exec;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index 411f21c..d92d9fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -65,7 +65,7 @@ public class TestSimpleFunctions {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNull.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){
@@ -93,7 +93,7 @@ public class TestSimpleFunctions {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNotNull.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){
@@ -121,7 +121,7 @@ public class TestSimpleFunctions {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstring.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){
@@ -161,7 +161,7 @@ public class TestSimpleFunctions {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstringNegative.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){
@@ -201,7 +201,7 @@ public class TestSimpleFunctions {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testByteSubstring.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
index 4eae66f..89f3292 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
@@ -61,7 +61,7 @@ public class TestAgg {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     return exec;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index d1c756f..5429bcf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -64,7 +64,7 @@ public class TestSimpleFilter {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     while(exec.next()){
       assertEquals(50, exec.getRecordCount());
@@ -88,7 +88,7 @@ public class TestSimpleFilter {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     int recordCount = 0;
     while(exec.next()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 41cb034..0120c7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -70,7 +70,7 @@ public class TestMergeJoin {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     int totalRecordCount = 0;
@@ -124,7 +124,7 @@ public class TestMergeJoin {
             .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
             .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     int totalRecordCount = 0;
@@ -177,7 +177,7 @@ public class TestMergeJoin {
             .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
             .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     int totalRecordCount = 0;
@@ -230,7 +230,7 @@ public class TestMergeJoin {
             .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
             .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     int totalRecordCount = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 1ee9ceb..d82c0e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -95,7 +95,7 @@ public class TestSimpleLimit {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     int recordCount = 0;
     while(exec.next()){
@@ -114,7 +114,7 @@ public class TestSimpleLimit {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     int recordCount = 0;
     long sum = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index 72dbe6e..fa67e06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -67,7 +67,7 @@ public class TestSimpleProjection {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/project/test1.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
index 4472668..a5837ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -66,7 +66,7 @@ public class TestSimpleSort {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/one_key_sort.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     
     int previousInt = Integer.MIN_VALUE;
@@ -114,7 +114,7 @@ public class TestSimpleSort {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/two_key_sort.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     
     int previousInt = Integer.MIN_VALUE;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index 5d2b67c..b53d1d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -66,7 +66,7 @@ public class TestSVRemover {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/remover/test1.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     while(exec.next()){
       int count = exec.getRecordCount();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index a3d923e..11aae2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -70,7 +70,7 @@ public class TestTraceMultiRecordBatch {
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
         PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/multi_record_batch_trace.json"), Charsets.UTF_8));
         FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
         SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
         while(exec.next()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index 7643cf8..10ce997 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -84,7 +84,7 @@ public class TestTraceOutputDump {
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
         PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
         FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
         SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
         while(exec.next()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index fe90ad4..836f177 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -64,7 +64,7 @@ public class TestSimpleUnion {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/union/test1.json"), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
-    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
     
     int[] counts = new int[]{100,50};

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
new file mode 100644
index 0000000..4406e04
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class FileTest {
+  public static void main(String[] args) throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "sync:///");
+    System.out.println(FileSystem.getDefaultUri(conf));
+    FileSystem fs = FileSystem.get(conf);
+//    FileSystem fs = new LocalSyncableFileSystem(conf);
+    Path path = new Path("/tmp/testFile");
+    FSDataOutputStream out = fs.create(path);
+    byte[] s = "hello world".getBytes();
+    out.write(s);
+    out.sync();
+//    out.close();
+    FSDataInputStream in = fs.open(path);
+    byte[] bytes = new byte[s.length];
+    in.read(bytes);
+    System.out.println(new String(bytes));
+    File file = new File("/tmp/testFile");
+    FileOutputStream fos = new FileOutputStream(file);
+    FileInputStream fis = new FileInputStream(file);
+    fos.write(s);
+    fos.getFD().sync();
+    fis.read(bytes);
+    System.out.println(new String(bytes));
+    out = fs.create(new Path("/tmp/file"));
+    for (int i = 0; i < 100; i++) {
+      bytes = new byte[256*1024];
+      Stopwatch watch = new Stopwatch();
+      watch.start();
+      out.write(bytes);
+      out.sync();
+      long t = watch.elapsed(TimeUnit.MILLISECONDS);
+      System.out.printf("Elapsed: %d. Rate %d.\n", t, (long) ((long) bytes.length * 1000L / t));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
new file mode 100644
index 0000000..cf5e128
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSpoolingBuffer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpoolingBuffer.class);
+
+  @Test
+  public void testMultipleExchangesSingleThread() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    DrillConfig conf = DrillConfig.create("drill-spool-test-module.conf");
+
+    try(Drillbit bit1 = new Drillbit(conf, serviceSet);
+        DrillClient client = new DrillClient(conf, serviceSet.getCoordinator());) {
+
+      bit1.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+              Files.toString(FileUtils.getResourceAsFile("/work/batch/multiple_exchange.json"),
+                      Charsets.UTF_8));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(500024, count);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-module.conf b/exec/java-exec/src/test/resources/drill-module.conf
index 99dd863..5803610 100644
--- a/exec/java-exec/src/test/resources/drill-module.conf
+++ b/exec/java-exec/src/test/resources/drill-module.conf
@@ -56,8 +56,8 @@ drill.exec: {
     start: 35000
   },
   work: {
-    max.width.per.endpoint: 5,
+    max.width.per.endpoint: 1,
     global.max.width: 100,
-    executor.threads: 4
+    executor.threads: 1
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-spool-test-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-spool-test-module.conf b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
new file mode 100644
index 0000000..c20cc85
--- /dev/null
+++ b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
@@ -0,0 +1,83 @@
+//  This file tells Drill to consider this module when class path scanning.  
+//  This file can also include any supplementary configuration information.  
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
+drill.exec: {
+  cluster-id: "drillbits1"
+  rpc: {
+    user: {
+      server: {
+        port: 31010
+        threads: 1
+      }
+      client: {
+        threads: 1
+      }
+    },
+    bit: {
+      server: {
+        port : 31011,
+        retry:{
+          count: 7200,
+          delay: 500
+        },
+        threads: 1
+      }
+    },
+  	use.ip : false
+  },
+  operator: {
+    packages += "org.apache.drill.exec.physical.config"
+  },
+  optimizer: {
+    implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+  },
+  functions: ["org.apache.drill.expr.fn.impl"],
+  storage: {
+    packages += "org.apache.drill.exec.store"  
+  },
+  metrics : { 
+    context: "drillbit",
+    jmx: {
+      enabled : true
+    },
+    log: {
+      enabled : false,
+      interval : 60
+    }
+  },
+  zk: {
+	connect: "localhost:2181",
+	root: "/drill",
+	refresh: 500,
+	timeout: 5000,
+  	retry: {
+  	  count: 7200,
+  	  delay: 500
+  	}    
+  },
+  functions: ["org.apache.drill.expr.fn.impl"],
+  network: {
+    start: 35000
+  },
+  work: {
+    max.width.per.endpoint: 1,
+    global.max.width: 100,
+    executor.threads: 1
+  },
+  trace: {
+    directory: "/var/log/drill",
+    filesystem: "file:///"
+  },
+  tmp: {
+    directories: ["/tmp/drill"],
+    filesystem: "drill-local:///"
+  },
+  spooling: {
+    impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+    delete: false,
+    size: 0
+  }
+}
\ No newline at end of file


[09/10] git commit: DRILL-297: Trace operator throws NPE if the configured path is not writable

Posted by ja...@apache.org.
DRILL-297: Trace operator throws NPE if the configured path is not writable


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/366bf8e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/366bf8e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/366bf8e4

Branch: refs/heads/master
Commit: 366bf8e4564333dd26b1f5672d4dd0fea28afacc
Parents: 316ce8a
Author: Mehant Baid <me...@github.com>
Authored: Sun Dec 1 20:09:04 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:09:04 2013 -0800

----------------------------------------------------------------------
 .../apache/drill/exec/physical/impl/trace/TraceRecordBatch.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/366bf8e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 1b990c9..c138aba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -32,6 +32,7 @@ import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -79,7 +80,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
   /* File descriptors needed to be able to dump to log file */
   private OutputStream fos;
 
-  public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+  public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
     super(pop, context, incoming);
     this.traceTag = pop.traceTag;
     logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
@@ -95,7 +96,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
       /* create the file */
       fos = fs.create(new Path(fileName));
     } catch (IOException e) {
-      logger.error("Unable to create file: " + fileName);
+        throw new ExecutionSetupException("Unable to create file: " + fileName + " check permissions or if directory exists", e);
     }
   }
 


[04/10] git commit: DRILL-313: Fix for Limit operator only transferring buffers on new schema

Posted by ja...@apache.org.
DRILL-313: Fix for Limit operator only transferring buffers on new schema


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c0389f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c0389f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c0389f3

Branch: refs/heads/master
Commit: 6c0389f394a789beb74103309f5bed13ddeccf95
Parents: b91f2e8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 19:48:57 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:48:57 2013 -0800

----------------------------------------------------------------------
 .../physical/impl/limit/LimitRecordBatch.java   |  8 ++--
 .../drill/exec/fn/impl/GeneratorFunctions.java  | 22 ++++++++-
 .../physical/impl/limit/TestSimpleLimit.java    | 35 ++++++++++++++
 .../src/test/resources/limit/test4.json         | 49 ++++++++++++++++++++
 4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 8390997..712af9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -36,6 +36,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   private int recordsLeft;
   private boolean noEndLimit;
   private boolean skipBatch;
+  List<TransferPair> transfers = Lists.newArrayList();
 
   public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context, incoming);
@@ -52,7 +53,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   protected void setupNewSchema() throws SchemaChangeException {
     container.clear();
 
-    List<TransferPair> transfers = Lists.newArrayList();
 
     for(VectorWrapper<?> v : incoming){
       TransferPair pair = v.getValueVector().getTransferPair();
@@ -74,9 +74,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
     container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
 
-    for(TransferPair tp : transfers) {
-      tp.transfer();
-    }
   }
 
   @Override
@@ -96,6 +93,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected void doWork() {
+    for(TransferPair tp : transfers) {
+      tp.transfer();
+    }
     skipBatch = false;
     int recordCount = incoming.getRecordCount();
     if(recordCount <= recordsToSkip) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
index d12633e..b79ccd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.Output;
 import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -38,17 +39,36 @@ public class GeneratorFunctions {
           OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt");
   public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true),
           OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8");
+  public static final FunctionDefinition INCREASING_BIGINT = FunctionDefinition.simple("increasingBigInt", new ArgumentValidators.NumericTypeAllowed(1, true),
+          OutputTypeDeterminer.FIXED_BIGINT, "increasingBigInt");
 
   public static class Provider implements CallProvider {
 
     @Override
     public FunctionDefinition[] getFunctionDefintions() {
       return new FunctionDefinition[] { RANDOM_BIG_INT,
-                                        RANDOM_FLOAT8 };
+                                        RANDOM_FLOAT8,
+                                        INCREASING_BIGINT };
     }
 
   }
 
+  @FunctionTemplate(name = "increasingBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class IncreasingBigInt implements DrillSimpleFunc {
+
+    @Param BigIntHolder start;
+    @Workspace long current;
+    @Output BigIntHolder out;
+
+    public void setup(RecordBatch incoming) {
+      current = 0;
+    }
+
+    public void eval() {
+      out.value = start.value + current++;
+    }
+  }
+
   @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
   public static class RandomBigIntGauss implements DrillSimpleFunc {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index b254fc0..1ee9ceb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -38,8 +38,11 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -79,6 +82,13 @@ public class TestSimpleLimit {
     }};
 
     verifyLimitCount(bitContext, connection, "test2.json", 69999);
+    long start = 30000;
+    long end = 100000;
+    long expectedSum = (end - start) * (end + start - 1) / 2; //Formula for sum of series
+   
+    verifySum(bitContext, connection, "test4.json", 70000, expectedSum);
+    
+
   }
 
   private void verifyLimitCount(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount) throws Throwable {
@@ -99,4 +109,29 @@ public class TestSimpleLimit {
     }
     assertTrue(!context.isFailed());
   }
+
+  private void verifySum(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount, long expectedSum) throws Throwable {
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    int recordCount = 0;
+    long sum = 0;
+    while(exec.next()){
+      recordCount += exec.getRecordCount();
+      BigIntVector v = (BigIntVector) exec.iterator().next();
+      for (int i = 0; i < v.getAccessor().getValueCount(); i++) {
+        sum += v.getAccessor().get(i);
+      }
+    }
+
+    assertEquals(expectedCount, recordCount);
+    assertEquals(expectedSum, sum);
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/resources/limit/test4.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test4.json b/exec/java-exec/src/test/resources/limit/test4.json
new file mode 100644
index 0000000..b7793b1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test4.json
@@ -0,0 +1,49 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100000000, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
+        },
+        {
+           @id:2,
+           child: 1,
+           pop:"project",
+           exprs: [
+             { ref: "col1", expr:"increasingBigInt(0)"}
+           ]
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"limit",
+            first:30000,
+            last:100000
+        },
+        {
+          @id:4,
+          child:3,
+          pop: "selection-vector-remover"
+
+        },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file