You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/09 09:23:52 UTC

[5/8] drill git commit: DRILL-133: LocalExchange planning and exec.

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index d99f40d..e908538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -50,6 +50,7 @@ import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
 import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
+import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
 import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
 import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
@@ -290,13 +291,6 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
 
 
     /* 4.)
-     * Next, we add any required selection vector removers given the supported encodings of each
-     * operator. This will ultimately move to a new trait but we're managing here for now to avoid
-     * introducing new issues in planning before the next release
-     */
-    phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
-
-    /* 5.)
      * Add ProducerConsumer after each scan if the option is set
      * Use the configured queueSize
      */
@@ -308,7 +302,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     */
 
 
-    /* 6.)
+    /* 5.)
      * if the client does not support complex types (Map, Repeated)
      * insert a project which which would convert
      */
@@ -317,7 +311,21 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
     }
 
+
+    /* 6.)
+     * Insert LocalExchange (mux and/or demux) nodes
+     */
+    phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions);
+
+
     /* 7.)
+     * Next, we add any required selection vector removers given the supported encodings of each
+     * operator. This will ultimately move to a new trait but we're managing here for now to avoid
+     * introducing new issues in planning before the next release
+     */
+    phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
+
+    /* 8.)
      * Finally, Make sure that the no rels are repeats.
      * This could happen in the case of querying the same table twice as Optiq may canonicalize these.
      */

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 7ad3f77..d821af8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -51,6 +51,8 @@ public class SystemOptionManager implements OptionManager {
       PlannerSettings.BROADCAST_THRESHOLD,
       PlannerSettings.BROADCAST_FACTOR,
       PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR,
+      PlannerSettings.MUX_EXCHANGE,
+      PlannerSettings.DEMUX_EXCHANGE,
       PlannerSettings.PRODUCER_CONSUMER,
       PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,
       PlannerSettings.HASH_SINGLE_KEY,

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 9902443..54cad56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -167,7 +167,6 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    assert chunks != null && chunks.size() > 0;
     if (endpointAffinities == null) {
         logger.debug("chunks: {}", chunks.size());
         endpointAffinities = AffinityCreator.getAffinityMap(chunks);

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index aa1609d..262c6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -69,11 +69,6 @@ public class DirectGroupScan extends AbstractGroupScan{
   }
 
   @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
   public String getDigest() {
     return String.valueOf(reader);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 8335ed9..22cc483 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -106,11 +106,6 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
   }
 
   @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
   public String getDigest() {
     return this.table.toString() + ", filter=" + filter;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 5736df8..dc90a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -167,11 +167,6 @@ public class MockGroupScanPOP extends AbstractGroupScan {
 
   }
 
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 053f5de..cdd0d18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -100,12 +100,6 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
   }
 
   @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-
-  @Override
   public int getOperatorType() {
     return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
new file mode 100644
index 0000000..4277580
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+
+/**
+ * Simple Map type data structure for storing entries of (int -> int) mappings where the max key value is below 2^16
+ * to avoid hashing keys and use direct array index reference for retrieving the values. Not thread-safe. Keys and
+ * values are expected to be >=0.
+ */
+public class ArrayWrappedIntIntMap {
+  private static final int MAX_KEY_VALUE = 1 << 16 - 1;
+  private static final int INITIAL_MAX_KEY_VALUE = 1 << 8 - 1;
+  private int[] values;
+
+  public ArrayWrappedIntIntMap() {
+    values = new int[INITIAL_MAX_KEY_VALUE + 1];
+    Arrays.fill(values, Integer.MIN_VALUE);
+  }
+
+  public void put(final int key, final int value) {
+    Preconditions.checkArgument(key >= 0 && key <= MAX_KEY_VALUE,
+        String.format("Index should be in range [0, %d], given [%d].", MAX_KEY_VALUE, key));
+    Preconditions.checkArgument(value >= 0, String.format("Value must be non-negative, given [%d]", value));
+
+    // resize the values array if the index falls beyond the current size of the array
+    if (values.length < key + 1) {
+      // Make the new size the next power of 2 number after the given index number
+      int newValuesLength = Integer.highestOneBit(key) * 2;
+      int[] newValues = Arrays.copyOf(values, newValuesLength);
+      Arrays.fill(newValues, values.length, newValues.length - 1, Integer.MIN_VALUE);
+      values = newValues;
+    }
+
+    values[key] = value;
+  }
+
+  /**
+   * Returns the value pointed by the given index.
+   * If the value is not set through put() it either returns Integer.MIN_VALUE or throws ArrayIndexOutOfBounds
+   * exception. Error checking is not done for faster retrieval.
+   */
+  public int get(int key) {
+    return values[key];
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index c83106c..ed16314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -26,25 +26,33 @@ import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.Receiver;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
 
 public abstract class AbstractDataCollector implements DataCollector{
 
-  private final List<DrillbitEndpoint> incoming;
+  private final List<MinorFragmentEndpoint> incoming;
   private final int oppositeMajorFragmentId;
   private final AtomicIntegerArray remainders;
   private final AtomicInteger remainingRequired;
-  protected final RawBatchBuffer[] buffers;
   private final AtomicInteger parentAccounter;
-  private final AtomicInteger finishedStreams = new AtomicInteger();
-  private final FragmentContext context;
 
-  public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) {
-    Preconditions.checkArgument(minInputsRequired > 0);
+  protected final RawBatchBuffer[] buffers;
+  protected final ArrayWrappedIntIntMap fragmentMap;
+
+  /**
+   * @param parentAccounter
+   * @param receiver
+   * @param numBuffers Number of RawBatchBuffer inputs required to store the incoming data
+   * @param bufferCapacity Capacity of each RawBatchBuffer.
+   * @param context
+   */
+  public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver,
+      final int numBuffers, final int bufferCapacity, FragmentContext context) {
     Preconditions.checkNotNull(receiver);
     Preconditions.checkNotNull(parentAccounter);
 
@@ -52,34 +60,43 @@ public abstract class AbstractDataCollector implements DataCollector{
     this.incoming = receiver.getProvidingEndpoints();
     this.remainders = new AtomicIntegerArray(incoming.size());
     this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
-    this.buffers = new RawBatchBuffer[minInputsRequired];
-    this.context = context;
+
+    // Create fragmentId to index that is within the range [0, incoming.size()-1]
+    // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
+    fragmentMap = new ArrayWrappedIntIntMap();
+    int index = 0;
+    for(MinorFragmentEndpoint endpoint : incoming) {
+      fragmentMap.put(endpoint.getId(), index);
+      index++;
+    }
+
+    buffers = new RawBatchBuffer[numBuffers];
+    remainingRequired = new AtomicInteger(numBuffers);
+
     try {
       String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
       Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, int.class);
-      for(int i = 0; i < buffers.length; i++) {
-          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, receiver.supportsOutOfOrderExchange() ? incoming.size() : 1);
+
+      for(int i=0; i<numBuffers; i++) {
+        buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, bufferCapacity);
       }
     } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
             NoSuchMethodException | ClassNotFoundException e) {
       context.fail(e);
     }
-    if (receiver.supportsOutOfOrderExchange()) {
-      this.remainingRequired = new AtomicInteger(1);
-    } else {
-      this.remainingRequired = new AtomicInteger(minInputsRequired);
-    }
   }
 
+  @Override
   public int getOppositeMajorFragmentId() {
     return oppositeMajorFragmentId;
   }
 
+  @Override
   public RawBatchBuffer[] getBuffers(){
     return buffers;
   }
 
-
+  @Override
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch)  throws IOException {
 
     // if we received an out of memory, add an item to all the buffer queues.
@@ -91,7 +108,7 @@ public abstract class AbstractDataCollector implements DataCollector{
 
     // check to see if we have enough fragments reporting to proceed.
     boolean decremented = false;
-    if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
+    if (remainders.compareAndSet(fragmentMap.get(minorFragmentId), 0, 1)) {
       int rem = remainingRequired.decrementAndGet();
       if (rem == 0) {
         parentAccounter.decrementAndGet();

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index ce14260..8c09f80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -24,11 +24,8 @@ import org.apache.drill.exec.physical.base.Receiver;
 
 public class MergingCollector extends AbstractDataCollector{
 
-  private AtomicInteger streamsRunning;
-
   public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
-    super(parentAccounter, receiver, 1, context);
-    streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
+    super(parentAccounter, receiver, 1, receiver.getProvidingEndpoints().size(), context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 5190d84..7ce9074 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -25,11 +25,11 @@ import org.apache.drill.exec.physical.base.Receiver;
 public class PartitionedCollector extends AbstractDataCollector{
 
   public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
-    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), context);
+    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), 1, context);
   }
 
   @Override
   protected RawBatchBuffer getBuffer(int minorFragmentId) {
-    return buffers[minorFragmentId];
+    return buffers[fragmentMap.get(minorFragmentId)];
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 378e81a..409450f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -47,9 +47,7 @@ import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -370,15 +368,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
 
   private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
     PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
-    MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
-    Fragment rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
-    PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
-    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
-
+    Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
 
+    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
     return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
-        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet,
-        initiatingClient.getSession());
+        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index cf99577..4d2a0df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
@@ -45,6 +46,7 @@ import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.util.VectorUtil;
 import org.junit.AfterClass;
@@ -60,6 +62,13 @@ import com.google.common.io.Resources;
 public class BaseTestQuery extends ExecTest{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
+  /**
+   * Number of Drillbits in test cluster. Default is 1.
+   *
+   * Tests can update the cluster size through {@link #setDrillbitCount(int)}
+   */
+  private static int drillbitCount = 1;
+
   private int[] columnWidths = new int[] { 8 };
 
   private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
@@ -84,12 +93,37 @@ public class BaseTestQuery extends ExecTest{
   };
 
   protected static DrillClient client;
-  protected static Drillbit bit;
+  protected static Drillbit[] bits;
   protected static RemoteServiceSet serviceSet;
   protected static DrillConfig config;
   protected static QuerySubmitter submitter = new QuerySubmitter();
   protected static BufferAllocator allocator;
 
+  protected static void setDrillbitCount(int newDrillbitCount) {
+    Preconditions.checkArgument(newDrillbitCount > 0, "Number of Drillbits must be at least one");
+    if (drillbitCount != newDrillbitCount) {
+      // TODO: Currently we have to shutdown the existing Drillbit cluster before starting a new one with the given
+      // Drillbit count. Revisit later to avoid stopping the cluster.
+      try {
+        closeClient();
+        drillbitCount = newDrillbitCount;
+        openClient();
+      } catch(Exception e) {
+        throw new RuntimeException("Failure while changing the number of Drillbits in test cluster.", e);
+      }
+    }
+  }
+
+  /**
+   * Useful for tests that require a DrillbitContext to get/add storage plugins, options etc.
+   *
+   * @return DrillbitContext of first Drillbit in the cluster.
+   */
+  protected static DrillbitContext getDrillbitContext() {
+    Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
+    return bits[0].getContext();
+  }
+
   static void resetClientAndBit() throws Exception{
     closeClient();
     openClient();
@@ -104,8 +138,13 @@ public class BaseTestQuery extends ExecTest{
     } else {
       serviceSet = RemoteServiceSet.getLocalServiceSet();
     }
-    bit = new Drillbit(config, serviceSet);
-    bit.run();
+
+    bits = new Drillbit[drillbitCount];
+    for(int i=0; i<drillbitCount; i++) {
+      bits[i] = new Drillbit(config, serviceSet);
+      bits[i].run();
+    }
+
     client = new DrillClient(config, serviceSet.getCoordinator());
     client.connect();
     List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY));
@@ -118,11 +157,11 @@ public class BaseTestQuery extends ExecTest{
     return allocator;
   }
 
-  public TestBuilder newTest() {
+  public static TestBuilder newTest() {
     return testBuilder();
   }
 
-  public TestBuilder testBuilder() {
+  public static TestBuilder testBuilder() {
     return new TestBuilder(allocator);
   }
 
@@ -131,9 +170,15 @@ public class BaseTestQuery extends ExecTest{
     if (client != null) {
       client.close();
     }
-    if (bit != null) {
-      bit.close();
+
+    if (bits != null) {
+      for(Drillbit bit : bits) {
+        if (bit != null) {
+          bit.close();
+        }
+      }
     }
+
     if(serviceSet != null) {
       serviceSet.close();
     }
@@ -142,21 +187,21 @@ public class BaseTestQuery extends ExecTest{
     }
   }
 
-  protected void runSQL(String sql) throws Exception {
+  protected static void runSQL(String sql) throws Exception {
     SilentListener listener = new SilentListener();
     testWithListener(QueryType.SQL, sql, listener);
     listener.waitForCompletion();
   }
 
-  protected List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
+  protected static List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
     return testRunAndReturn(QueryType.SQL, sql);
   }
 
-  protected List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
+  protected static List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
     return testRunAndReturn(QueryType.LOGICAL, logical);
   }
 
-  protected List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
+  protected static List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
     return testRunAndReturn(QueryType.PHYSICAL, physical);
   }
 
@@ -172,7 +217,7 @@ public class BaseTestQuery extends ExecTest{
     return resultListener.await();
   }
 
-  protected void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
+  protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
     query = normalizeQuery(query);
     client.runQuery(type, query, resultListener);
   }
@@ -216,15 +261,15 @@ public class BaseTestQuery extends ExecTest{
     return query;
   }
 
-  protected int testLogical(String query) throws Exception{
+  protected static int testLogical(String query) throws Exception{
     return testRunAndPrint(QueryType.LOGICAL, query);
   }
 
-  protected int testPhysical(String query) throws Exception{
+  protected static int testPhysical(String query) throws Exception{
     return testRunAndPrint(QueryType.PHYSICAL, query);
   }
 
-  protected int testSql(String query) throws Exception{
+  protected static int testSql(String query) throws Exception{
     return testRunAndPrint(QueryType.SQL, query);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index c95d2b2..36091af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -288,12 +288,11 @@ public class PlanTestBase extends BaseTestQuery {
    * This will submit an "EXPLAIN" statement, and return the column value which
    * contains the plan's string.
    */
-  protected String getPlanInString(String sql, String columnName)
+  protected static String getPlanInString(String sql, String columnName)
       throws Exception {
     List<QueryResultBatch> results = testSqlWithResults(sql);
 
-    RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
-        .getAllocator());
+    RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
     StringBuilder builder = new StringBuilder();
 
     for (QueryResultBatch b : results) {

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index bb855c9..7ff00fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -43,7 +43,7 @@ public class TestClassTransformation extends BaseTestQuery {
 
   @BeforeClass
   public static void beforeTestClassTransformation() throws Exception {
-    sessionOptions = new SessionOptionManager(bit.getContext().getOptionManager());
+    sessionOptions = new SessionOptionManager(getDrillbitContext().getOptionManager());
   }
 
   @Test
@@ -113,7 +113,7 @@ public class TestClassTransformation extends BaseTestQuery {
 
   private <T, X extends T> CodeGenerator<T> newCodeGenerator(Class<T> iface, Class<X> impl) {
     final TemplateClassDefinition<T> template = new TemplateClassDefinition<T>(iface, impl);
-    CodeGenerator<T> cg = CodeGenerator.get(template, bit.getContext().getFunctionImplementationRegistry());
+    CodeGenerator<T> cg = CodeGenerator.get(template, getDrillbitContext().getFunctionImplementationRegistry());
 
     ClassGenerator<T> root = cg.getRoot();
     root.setMappingSet(new MappingSet(new GeneratorMapping("doOutside", null, null, null)));

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
new file mode 100644
index 0000000..72715a7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * This test starts a Drill cluster with CLUSTER_SIZE nodes and generates data for test tables.
+ *
+ * Tests queries involve HashToRandomExchange (group by and join) and test the following.
+ *   1. Plan that has mux and demux exchanges inserted
+ *   2. Run the query and check the output record count
+ *   3. Take the plan we got in (1), use SimpleParallelizer to get PlanFragments and test that the number of
+ *   partition senders in a major fragment is not more than the number of Drillbit nodes in cluster and there exists
+ *   at most one partition sender per Drillbit.
+ */
+public class TestLocalExchange extends PlanTestBase {
+
+  public static TemporaryFolder testTempFolder = new TemporaryFolder();
+
+  private final static int CLUSTER_SIZE = 3;
+  private final static String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
+  private final static String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
+  private final static UserSession USER_SESSION = UserSession.Builder.newBuilder()
+      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .build();
+
+  private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+      1 /*parallelizationThreshold (slice_count)*/,
+      6 /*maxWidthPerNode*/,
+      1000 /*maxGlobalWidth*/,
+      1.2 /*affinityFactor*/);
+
+  private final static int NUM_DEPTS = 40;
+  private final static int NUM_EMPLOYEES = 1000;
+
+  private static String empTableLocation;
+  private static String deptTableLocation;
+
+  private static String groupByQuery;
+  private static String joinQuery;
+
+  private static String[] joinQueryBaselineColumns;
+  private static String[] groupByQueryBaselineColumns;
+
+  private static List<Object[]> groupByQueryBaselineValues;
+  private static List<Object[]> joinQueryBaselineValues;
+
+  @BeforeClass
+  public static void setupClusterSize() {
+    setDrillbitCount(CLUSTER_SIZE);
+  }
+
+  @BeforeClass
+  public static void setupTempFolder() throws IOException {
+    testTempFolder.create();
+  }
+
+  /**
+   * Generate data for two tables. Each table consists of several JSON files.
+   */
+  @BeforeClass
+  public static void generateTestDataAndQueries() throws Exception {
+    // Table 1 consists of two columns "emp_id", "emp_name" and "dept_id"
+    empTableLocation = testTempFolder.newFolder().getAbsolutePath();
+
+    // Write 100 records for each new file
+    final int empNumRecsPerFile = 100;
+    for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; fileIndex++) {
+      File file = new File(empTableLocation + File.separator + fileIndex + ".json");
+      PrintWriter printWriter = new PrintWriter(file);
+      for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile; recordIndex++) {
+        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d }",
+            recordIndex, recordIndex, recordIndex % NUM_DEPTS);
+        printWriter.println(record);
+      }
+      printWriter.close();
+    }
+
+    // Table 2 consists of two columns "dept_id" and "dept_name"
+    deptTableLocation = testTempFolder.newFolder().getAbsolutePath();
+
+    // Write 4 records for each new file
+    final int deptNumRecsPerFile = 4;
+    for(int fileIndex=0; fileIndex<NUM_DEPTS/deptNumRecsPerFile; fileIndex++) {
+      File file = new File(deptTableLocation + File.separator + fileIndex + ".json");
+      PrintWriter printWriter = new PrintWriter(file);
+      for (int recordIndex = fileIndex*deptNumRecsPerFile; recordIndex < (fileIndex+1)*deptNumRecsPerFile; recordIndex++) {
+        String record = String.format("{ \"dept_id\" : %d, \"dept_name\" : \"Department %d\" }",
+            recordIndex, recordIndex);
+        printWriter.println(record);
+      }
+      printWriter.close();
+    }
+
+    // Initialize test queries
+    groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
+    joinQuery = String.format("SELECT e.emp_name, d.dept_name FROM dfs.`%s` e JOIN dfs.`%s` d ON e.dept_id = d.dept_id",
+        empTableLocation, deptTableLocation);
+
+    // Generate and store output data for test queries. Used when verifying the output of queries ran using different
+    // configurations.
+
+    groupByQueryBaselineColumns = new String[] { "dept_id", "numEmployees" };
+
+    groupByQueryBaselineValues = Lists.newArrayList();
+    // group Id is generated based on expression 'recordIndex % NUM_DEPTS' above. 'recordIndex' runs from 0 to
+    // NUM_EMPLOYEES, so we expect each number of occurrance of each dept_id to be NUM_EMPLOYEES/NUM_DEPTS (1000/40 =
+    // 25)
+    final int numOccurrances = NUM_EMPLOYEES/NUM_DEPTS;
+    for(int i = 0; i < NUM_DEPTS; i++) {
+      groupByQueryBaselineValues.add(new Object[] { (long)i, (long)numOccurrances});
+    }
+
+    joinQueryBaselineColumns = new String[] { "emp_name", "dept_name" };
+
+    joinQueryBaselineValues = Lists.newArrayList();
+    for(int i = 0; i < NUM_EMPLOYEES; i++) {
+      final String employee = String.format("Employee %d", i);
+      final String dept = String.format("Department %d", i % NUM_DEPTS);
+      joinQueryBaselineValues.add(new String[] { employee, dept });
+    }
+  }
+
+  public static void setupHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+    // set slice count to 1, so that we can have more parallelization for testing
+    test("ALTER SESSION SET `planner.slice_target`=1");
+    // disable the broadcast join to produce plans with HashToRandomExchanges.
+    test("ALTER SESSION SET `planner.enable_broadcast_join`=false");
+    test("ALTER SESSION SET `planner.enable_mux_exchange`=" + isMuxOn);
+    test("ALTER SESSION SET `planner.enable_demux_exchange`=" + isDeMuxOn);
+  }
+
+  @Test
+  public void testGroupBy_NoMux_NoDeMux() throws Exception {
+    testGroupByHelper(false, false);
+  }
+
+  @Test
+  public void testJoin_NoMux_NoDeMux() throws Exception {
+    testJoinHelper(false, false);
+  }
+
+  @Test
+  public void testGroupBy_Mux_NoDeMux() throws Exception {
+    testGroupByHelper(true, false);
+  }
+
+  @Test
+  public void testJoin_Mux_NoDeMux() throws Exception {
+    testJoinHelper(true, false);
+  }
+
+  @Test
+  public void testGroupBy_NoMux_DeMux() throws Exception {
+    testGroupByHelper(false, true);
+  }
+
+  @Test
+  public void testJoin_NoMux_DeMux() throws Exception {
+    testJoinHelper(false, true);
+  }
+
+  @Test
+  public void testGroupBy_Mux_DeMux() throws Exception {
+    testGroupByHelper(true, true);
+  }
+
+  @Test
+  public void testJoin_Mux_DeMux() throws Exception {
+    testJoinHelper(true, true);
+  }
+
+  private static void testGroupByHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+    testHelper(isMuxOn, isDeMuxOn, groupByQuery,
+        isMuxOn ? 1 : 0, isDeMuxOn ? 1 : 0,
+        groupByQueryBaselineColumns, groupByQueryBaselineValues);
+  }
+
+  public static void testJoinHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+    testHelper(isMuxOn, isDeMuxOn, joinQuery,
+        isMuxOn ? 2 : 0, isDeMuxOn ? 2 : 0,
+        joinQueryBaselineColumns, joinQueryBaselineValues);
+  }
+
+  private static void testHelper(boolean isMuxOn, boolean isDeMuxOn, String query,
+      int expectedNumMuxes, int expectedNumDeMuxes, String[] baselineColumns, List<Object[]> baselineValues)
+      throws Exception {
+    setupHelper(isMuxOn, isDeMuxOn);
+
+    String plan = getPlanInString("EXPLAIN PLAN FOR " + query, JSON_FORMAT);
+    System.out.println("Plan: " + plan);
+
+    // Make sure the plan has mux and demux exchanges (TODO: currently testing is rudimentary,
+    // need to move it to sophisticated testing once we have better planning test tools are available)
+    assertEquals("Wrong number of MuxExchanges are present in the plan",
+        expectedNumMuxes, StringUtils.countMatches(plan, MUX_EXCHANGE));
+
+    assertEquals("Wrong number of DeMuxExchanges are present in the plan",
+        expectedNumDeMuxes, StringUtils.countMatches(plan, DEMUX_EXCHANGE));
+
+    // Run the query and verify the output
+    TestBuilder testBuilder = testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns(baselineColumns);
+
+    for(Object[] baselineRecord : baselineValues) {
+      testBuilder.baselineValues(baselineRecord);
+    }
+
+    testBuilder.go();
+
+    testHelperVerifyPartitionSenderParallelization(plan, isMuxOn, isDeMuxOn);
+  }
+
+  // Verify the number of partition senders in a major fragments is not more than the cluster size and each endpoint
+  // in the cluster has at most one fragment from a given major fragment that has the partition sender.
+  private static void testHelperVerifyPartitionSenderParallelization(
+      String plan, boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+
+    final DrillbitContext drillbitContext = getDrillbitContext();
+    final PhysicalPlanReader planReader = drillbitContext.getPlanReader();
+    final Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
+
+    final List<Integer> deMuxFragments = Lists.newLinkedList();
+    final List<Integer> htrFragments = Lists.newLinkedList();
+    final PlanningSet planningSet = new PlanningSet();
+
+    // Create a planningSet to get the assignment of major fragment ids to fragments.
+    PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);
+
+    findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
+
+    QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
+        QueryId.getDefaultInstance(),
+        drillbitContext.getBits(), planReader, rootFragment, USER_SESSION);
+
+    // Make sure the number of minor fragments with HashPartitioner within a major fragment is not more than the
+    // number of Drillbits in cluster
+    ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap = ArrayListMultimap.create();
+    for(PlanFragment planFragment : qwu.getFragments()) {
+      if (planFragment.getFragmentJson().contains("hash-partition-sender")) {
+        int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
+        DrillbitEndpoint assignedEndpoint = planFragment.getAssignment();
+        partitionSenderMap.get(majorFragmentId).add(assignedEndpoint);
+      }
+    }
+
+    if (isMuxOn) {
+      verifyAssignment(htrFragments, partitionSenderMap);
+    }
+
+    if (isDeMuxOn) {
+      verifyAssignment(deMuxFragments, partitionSenderMap);
+    }
+  }
+
+  /**
+   * Helper method to find the major fragment ids of fragments that have PartitionSender.
+   * A fragment can have PartitionSender if sending exchange of the current fragment is a
+   *   1. DeMux Exchange -> goes in deMuxFragments
+   *   2. HashToRandomExchange -> goes into htrFragments
+   */
+  private static void findFragmentsWithPartitionSender(Fragment currentRootFragment, PlanningSet planningSet,
+      List<Integer> deMuxFragments, List<Integer> htrFragments) {
+
+    if (currentRootFragment != null) {
+      final Exchange sendingExchange = currentRootFragment.getSendingExchange();
+      if (sendingExchange != null) {
+        final int majorFragmentId = planningSet.get(currentRootFragment).getMajorFragmentId();
+        if (sendingExchange instanceof UnorderedDeMuxExchange) {
+          deMuxFragments.add(majorFragmentId);
+        } else if (sendingExchange instanceof HashToRandomExchange) {
+          htrFragments.add(majorFragmentId);
+        }
+      }
+
+      for(ExchangeFragmentPair e : currentRootFragment.getReceivingExchangePairs()) {
+        findFragmentsWithPartitionSender(e.getNode(), planningSet, deMuxFragments, htrFragments);
+      }
+    }
+  }
+
+  /** Helper method to verify the number of PartitionSenders in a given fragment endpoint assignments */
+  private static void verifyAssignment(List<Integer> fragmentList,
+      ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap) {
+
+    // We expect at least one entry the list
+    assertTrue(fragmentList.size() > 0);
+
+    for(Integer majorFragmentId : fragmentList) {
+      // we expect the fragment that has DeMux/HashToRandom as sending exchange to have parallelization with not more
+      // than the number of nodes in the cluster and each node in the cluster can have at most one assignment
+      List<DrillbitEndpoint> assignments = partitionSenderMap.get(majorFragmentId);
+      assertNotNull(assignments);
+      assertTrue(assignments.size() > 0);
+      assertTrue(String.format("Number of partition senders in major fragment [%d] is more than expected", majorFragmentId), CLUSTER_SIZE >= assignments.size());
+
+      // Make sure there are no duplicates in assigned endpoints (i.e at most one partition sender per endpoint)
+      assertTrue("Some endpoints have more than one fragment that has ParitionSender", ImmutableSet.copyOf(assignments).size() == assignments.size());
+    }
+  }
+
+  @AfterClass
+  public static void cleanupTempFolder() throws IOException {
+    testTempFolder.delete();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index 9a32ff9..3116fbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -54,11 +54,16 @@ public abstract class PopUnitTestBase  extends ExecTest{
     return i;
   }
 
-  public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException, ForemanSetupException {
-    MakeFragmentsVisitor f = new MakeFragmentsVisitor();
+  public static Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException,
+      IOException, ForemanSetupException {
+    return getRootFragmentFromPlanString(reader, Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+  }
+
 
-    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+  public static Fragment getRootFragmentFromPlanString(PhysicalPlanReader reader, String planString)
+      throws FragmentSetupException, IOException, ForemanSetupException {
+    PhysicalPlan plan = reader.readPhysicalPlan(planString);
     PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
-    return o.accept(f, null);
+    return o.accept(MakeFragmentsVisitor.INSTANCE, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 6349b76..07d310a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared;
@@ -35,6 +34,8 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestFragmentChecker extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFragmentChecker.class);
 
@@ -44,12 +45,10 @@ public class TestFragmentChecker extends PopUnitTestBase{
 
   }
 
-  private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
-
+  private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception{
     System.out.println(String.format("=================Building plan fragments for [%s].  Allowing %d total Drillbits.==================", fragmentFile, bitCount));
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
-    PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
     SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2);
     List<DrillbitEndpoint> endpoints = Lists.newArrayList();
     DrillbitEndpoint localBit = null;
@@ -61,7 +60,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
-    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet,
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot,
         UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build());
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
 
@@ -71,9 +70,9 @@ public class TestFragmentChecker extends PopUnitTestBase{
       System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
       System.out.print(f.getFragmentJson());
     }
-    //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
 
-    logger.debug("Planning Set {}", planningSet);
+    assertEquals(expectedFragmentCount,
+        qwu.getFragments().size() + 1 /* root fragment is not part of the getFragments() list*/);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 219e66f..9999be0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -147,8 +147,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
 
     List<QueryResultBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
     assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
-    RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
-        .getAllocator());
+    RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
 
     QueryResultBatch b = result.get(0);
     loader.load(b.getHeader().getDef(), b.getData());
@@ -166,8 +165,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   public void testNullableFilter() throws Exception {
     List<QueryResultBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
     assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
-    RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
-        .getAllocator());
+    RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
 
     QueryResultBatch b = result.get(0);
     loader.load(b.getHeader().getDef(), b.getData());

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
new file mode 100644
index 0000000..8539e59
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestArrayWrappedIntIntMap {
+  @Test
+  public void testSimple() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    map.put(0, 0);
+    map.put(1, 1);
+    map.put(9, 9);
+
+    assertEquals(0, map.get(0));
+    assertEquals(1, map.get(1));
+    assertEquals(9, map.get(9));
+  }
+
+  @Test(expected = ArrayIndexOutOfBoundsException.class)
+  public void testInvalidKeyAccess() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    map.put(0, 0);
+    map.put(1, 1);
+    map.put(9, 9);
+
+    assertEquals(0, map.get(0));
+    assertEquals(1, map.get(1));
+    assertEquals(9, map.get(9));
+
+    assertEquals(Integer.MIN_VALUE, map.get(2));
+    map.get(256); // this should throw ArrayOutOfBoundsException
+  }
+
+  @Test
+  public void testResizing() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    int[] expectedValues = new int[] {1, 32, 64, 150, 256, 4000};
+
+    for(int i=0; i<expectedValues.length; i++) {
+      map.put(expectedValues[i], expectedValues[i]);
+    }
+
+    for(int i=0; i<expectedValues.length; i++) {
+      assertEquals(expectedValues[i], map.get(expectedValues[i]));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidKeyBelowMinValueSupported() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    map.put(-1, 0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidKeyAboveMaxKeyValueSupported() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    map.put(1 << 16, 0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidValuePut() {
+    ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+    map.put(1, -1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1f08574..0e56ed6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -389,6 +389,9 @@
             <additionalClasspathElements>
               <additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement>
             </additionalClasspathElements>
+            <systemPropertyVariables>
+              <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+            </systemPropertyVariables>
           </configuration>
         </plugin>
         <plugin>