You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/09 09:23:51 UTC

[4/8] drill git commit: DRILL-2187: Single Broadcast Sender

DRILL-2187: Single Broadcast Sender

Also includes:
1. Fix merge join planning issue (1c5c810 by jinfengni)
2. ExternalSort: Check the memory available for in-memory sorting or not in making decision to spill or not (36f9dd1)
3. Cleanup in ExternalSortBatch and its helper classes (36f9dd1)
4. MergeJoinBatch: Limit the outgoing record batch size to 2^15 (37dfeb8)
5. StreamingAggBatch: Limit outgoing record batch size to 2^15 (7d8a2e4)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9fd1430d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9fd1430d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9fd1430d

Branch: refs/heads/master
Commit: 9fd1430db69309c2e476007de851142f778ddb62
Parents: 6be9e3b
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Feb 1 08:43:47 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun Mar 8 22:48:45 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/io/netty/buffer/DrillBuf.java |  78 +-
 .../java/io/netty/buffer/FakeAllocator.java     |   6 +
 .../org/apache/drill/exec/memory/Accountor.java |  12 +-
 .../drill/exec/memory/BufferAllocator.java      |   9 +-
 .../drill/exec/memory/TopLevelAllocator.java    |  17 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  10 +-
 .../exec/physical/impl/SingleSenderCreator.java |   2 +-
 .../impl/aggregate/StreamingAggTemplate.java    |  90 +-
 .../BroadcastSenderRootExec.java                |  64 +-
 .../exec/physical/impl/join/JoinStatus.java     |   8 +-
 .../exec/physical/impl/join/JoinTemplate.java   |  10 +-
 .../PartitionSenderRootExec.java                |   3 +-
 .../partitionsender/PartitionerTemplate.java    |   2 +-
 .../impl/sort/SortRecordBatchBuilder.java       |  47 +-
 .../physical/impl/xsort/ExternalSortBatch.java  | 114 +--
 .../exec/physical/impl/xsort/MSortTemplate.java |  14 +-
 .../impl/xsort/PriorityQueueCopier.java         |   3 +
 .../impl/xsort/PriorityQueueCopierTemplate.java |   7 +-
 .../exec/planner/logical/DrillRuleSets.java     |  13 +-
 .../planner/physical/BroadcastExchangePrel.java |  13 +-
 .../physical/DrillDistributionTraitDef.java     |   3 +-
 .../exec/planner/physical/HashJoinPrule.java    |  25 +-
 .../exec/planner/physical/JoinPruleBase.java    |  79 +-
 .../exec/planner/physical/MergeJoinPrel.java    |   1 +
 .../exec/planner/physical/MergeJoinPrule.java   |  23 +-
 .../exec/planner/physical/PlannerSettings.java  |   7 +-
 .../exec/planner/physical/ProjectPrule.java     |   2 +-
 .../exec/planner/physical/StreamAggPrule.java   |   6 +-
 .../planner/physical/SubsetTransformer.java     |  14 +-
 .../drill/exec/proto/helper/QueryIdHelper.java  |   8 +
 .../exec/record/FragmentWritableBatch.java      |  37 +-
 .../drill/exec/record/RawFragmentBatch.java     |  17 +-
 .../drill/exec/record/VectorAccessible.java     |   7 -
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |   1 +
 .../apache/drill/exec/rpc/data/AckSender.java   |  63 ++
 .../apache/drill/exec/rpc/data/DataClient.java  |   2 +-
 .../exec/rpc/data/DataConnectionCreator.java    |   4 +-
 .../exec/rpc/data/DataConnectionManager.java    |  18 +-
 .../rpc/data/DataProtobufLengthDecoder.java     |  19 +-
 .../exec/rpc/data/DataResponseHandler.java      |   7 +-
 .../exec/rpc/data/DataResponseHandlerImpl.java  |  28 +-
 .../drill/exec/rpc/data/DataRpcConfig.java      |   2 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |  80 +-
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/work/batch/ResponseSenderQueue.java    |  11 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   2 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  10 +-
 .../impl/broadcastsender/TestBroadcast.java     |  50 ++
 .../apache/drill/exec/server/TestBitRpc.java    |  19 +-
 .../work/batch/TestUnlimitedBatchBuffer.java    |  34 +-
 .../test/resources/broadcast/customer/cust.json |   4 +
 .../src/test/resources/broadcast/sales/f1.json  |   2 +
 .../src/test/resources/broadcast/sales/f2.json  |   2 +
 .../org/apache/drill/exec/proto/BitData.java    | 879 ++++++++++---------
 .../apache/drill/exec/proto/SchemaBitData.java  |  71 +-
 .../exec/proto/beans/BitClientHandshake.java    |  24 -
 .../exec/proto/beans/FragmentRecordBatch.java   | 114 ++-
 protocol/src/main/protobuf/BitData.proto        |  15 +-
 58 files changed, 1324 insertions(+), 889 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 8e9d395..d87fb76 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,6 +27,7 @@ import java.nio.ByteOrder;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.exec.memory.Accountor;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +44,7 @@ public final class DrillBuf extends AbstractByteBuf {
   private final long addr;
   private final int offset;
   private final boolean rootBuffer;
-
+  private final AtomicLong rootRefCnt = new AtomicLong(1);
   private volatile BufferAllocator allocator;
   private volatile Accountor acct;
   private volatile int length;
@@ -83,11 +84,36 @@ public final class DrillBuf extends AbstractByteBuf {
     this.acct = a;
     this.length = 0;
     this.addr = 0;
-    this.rootBuffer = true;
+    this.rootBuffer = false;
     this.offset = 0;
   }
 
+  /**
+   * Special constructor used for RPC ownership transfer.  Takes a snapshot slice of the current buf
+   *  but points directly to the underlying UnsafeLittleEndian buffer.  Does this by calling unwrap()
+   *  twice on the provided DrillBuf and expecting an UnsafeDirectLittleEndian buffer. This operation
+   *  includes taking a new reference count on the underlying buffer and maintaining returning with a
+   *  current reference count for itself (masking the underlying reference count).
+   * @param allocator
+   * @param a Allocator used when users try to receive allocator from buffer.
+   * @param b Accountor used for accounting purposes.
+   */
+  public DrillBuf(BufferAllocator allocator, Accountor a, DrillBuf b) {
+    this(allocator, a, getUnderlying(b), b, 0, b.length, true);
+    assert b.unwrap().unwrap() instanceof UnsafeDirectLittleEndian;
+    b.unwrap().unwrap().retain();
+  }
+
+
   private DrillBuf(DrillBuf buffer, int index, int length) {
+    this(buffer.allocator, null, buffer, buffer, index, length, false);
+  }
+
+  private static ByteBuf getUnderlying(DrillBuf b){
+    ByteBuf underlying = b.unwrap().unwrap();
+    return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
+  }
+  private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
     super(length);
     if (index < 0 || index > buffer.capacity() - length) {
       throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + index + ", " + length + ')');
@@ -96,13 +122,13 @@ public final class DrillBuf extends AbstractByteBuf {
     this.length = length;
     writerIndex(length);
 
-    this.b = buffer;
+    this.b = replacement;
     this.addr = buffer.memoryAddress() + index;
     this.offset = index;
-    this.acct = null;
+    this.acct = a;
     this.length = length;
-    this.rootBuffer = false;
-    this.allocator = buffer.allocator;
+    this.rootBuffer = root;
+    this.allocator = allocator;
   }
 
   public void setOperatorContext(OperatorContext c) {
@@ -132,7 +158,12 @@ public final class DrillBuf extends AbstractByteBuf {
 
   @Override
   public int refCnt() {
-    return b.refCnt();
+    if(rootBuffer){
+      return (int) this.rootRefCnt.get();
+    }else{
+      return b.refCnt();
+    }
+
   }
 
   private long addr(int index) {
@@ -203,20 +234,26 @@ public final class DrillBuf extends AbstractByteBuf {
 
   @Override
   public synchronized boolean release() {
-    if (b.release() && rootBuffer) {
-      acct.release(this, length);
-      return true;
-    }
-    return false;
+    return release(1);
   }
 
+  /**
+   * Release the provided number of reference counts.  If this is a root buffer, will decrease accounting if the local reference count returns to zero.
+   */
   @Override
   public synchronized boolean release(int decrement) {
-    if (b.release(decrement) && rootBuffer) {
-      acct.release(this, length);
-      return true;
+
+    if(rootBuffer){
+      if(0 == this.rootRefCnt.addAndGet(-decrement)){
+        b.release(decrement);
+        acct.release(this, length);
+        return true;
+      }else{
+        return false;
+      }
+    }else{
+      return b.release(decrement);
     }
-    return false;
   }
 
   @Override
@@ -405,14 +442,17 @@ public final class DrillBuf extends AbstractByteBuf {
 
   @Override
   public ByteBuf retain(int increment) {
-    b.retain(increment);
+    if(rootBuffer){
+      this.rootRefCnt.addAndGet(increment);
+    }else{
+      b.retain(increment);
+    }
     return this;
   }
 
   @Override
   public ByteBuf retain() {
-    b.retain();
-    return this;
+    return retain(1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
index 3de0a75..721aff9 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.util.Pointer;
 
 class FakeAllocator implements BufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FakeAllocator.class);
@@ -155,4 +156,9 @@ class FakeAllocator implements BufferAllocator {
 
   }
 
+  @Override
+  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+    throw new UnsupportedOperationException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 2b48ef0..eb932ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -110,8 +110,18 @@ public class Accountor {
   }
 
   public boolean transferTo(Accountor target, DrillBuf buf, long size) {
+    return transfer(target, buf, size, true);
+  }
+
+  public boolean transferIn(DrillBuf buf, long size) {
+    return transfer(this, buf, size, false);
+  }
+
+  private boolean transfer(Accountor target, DrillBuf buf, long size, boolean release) {
     boolean withinLimit = target.forceAdditionalReservation(size);
-    release(buf, size);
+    if(release){
+      release(buf, size);
+    }
 
     if (ENABLE_ACCOUNTING) {
       target.buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace()));

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 83d9d1e..30b905f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -19,11 +19,12 @@ package org.apache.drill.exec.memory;
 
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
 
 import java.io.Closeable;
 
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.util.Pointer;
 
 /**
  * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
@@ -63,6 +64,12 @@ public interface BufferAllocator extends Closeable {
    */
   public boolean takeOwnership(DrillBuf buf) ;
 
+  /**
+   * Take over ownership of fragment accounting.  Always takes over ownership.
+   * @param buf
+   * @return false if over allocation.
+   */
+  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
 
   public PreAllocator getNewPreAllocator();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 67e1fdb..2a28bcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -25,15 +25,14 @@ import io.netty.buffer.UnsafeDirectLittleEndian;
 import java.util.IdentityHashMap;
 import java.util.HashMap;
 import java.util.Map;
-
 import java.util.Map.Entry;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
-
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.Pointer;
 
 public class TopLevelAllocator implements BufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
@@ -75,6 +74,13 @@ public class TopLevelAllocator implements BufferAllocator {
     return buf.transferAccounting(acct);
   }
 
+  @Override
+  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+    DrillBuf b = new DrillBuf(this, acct, buf);
+    out.value = b;
+    return acct.transferIn(b, b.capacity());
+  }
+
   public DrillBuf buffer(int min, int max) {
     if (min == 0) {
       return empty;
@@ -198,6 +204,13 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     @Override
+    public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+      DrillBuf b = new DrillBuf(this, acct, buf);
+      out.value = b;
+      return acct.transferIn(b, b.capacity());
+    }
+
+    @Override
     public DrillBuf buffer(int size, int max) {
       if (size == 0) {
         return empty;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e413921..108f5bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -60,7 +60,7 @@ public class FragmentContext implements Closeable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
 
-  private Map<FragmentHandle, DataTunnel> tunnels = Maps.newHashMap();
+  private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
 
   private final DrillbitContext context;
   private final UserClientConnection connection;
@@ -239,11 +239,11 @@ public class FragmentContext implements Closeable {
     return context.getController().getTunnel(endpoint);
   }
 
-  public DataTunnel getDataTunnel(DrillbitEndpoint endpoint, FragmentHandle remoteHandle) {
-    DataTunnel tunnel = tunnels.get(remoteHandle);
+  public DataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+    DataTunnel tunnel = tunnels.get(endpoint);
     if (tunnel == null) {
-      tunnel = context.getDataConnectionsPool().getTunnel(endpoint, remoteHandle);
-      tunnels.put(remoteHandle, tunnel);
+      tunnel = context.getDataConnectionsPool().getTunnel(endpoint);
+      tunnels.put(endpoint, tunnel);
     }
     return tunnel;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 6db9f4a..812c89c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -80,7 +80,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       this.config = config;
       this.recMajor = config.getOppositeMajorFragmentId();
       FragmentHandle opposite = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build();
-      this.tunnel = context.getDataTunnel(config.getDestination(), opposite);
+      this.tunnel = context.getDataTunnel(config.getDestination());
       this.context = context;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 14e6aff..36f9f29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -29,7 +29,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
-  private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
+  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+
   private IterOutcome lastOutcome = null;
   private boolean first = true;
   private boolean newSchema = false;
@@ -37,14 +38,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   private int underlyingIndex = 0;
   private int currentIndex;
   private int addedRecordCount = 0;
-  private boolean pendingOutput = false;
   private IterOutcome outcome;
   private int outputCount = 0;
   private RecordBatch incoming;
-  private BatchSchema schema;
   private StreamingAggBatch outgoing;
   private FragmentContext context;
-  private InternalBatch remainderBatch;
   private boolean done = false;
 
 
@@ -52,7 +50,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
-    this.schema = incoming.getSchema();
     this.outgoing = outgoing;
     setupInterior(incoming, outgoing);
   }
@@ -74,12 +71,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     return outputCount;
   }
 
-  private AggOutcome tooBigFailure() {
-    context.fail(new Exception(TOO_BIG_ERROR));
-    this.outcome = IterOutcome.STOP;
-    return AggOutcome.CLEANUP_AND_RETURN;
-  }
-
   @Override
   public AggOutcome doWork() {
     if (done) {
@@ -118,25 +109,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         }
       }
 
-      // pick up a remainder batch if we have one.
-      if (remainderBatch != null) {
-        outputToBatch( previousIndex );
-        remainderBatch.clear();
-        remainderBatch = null;
-        return setOkAndReturn();
-      }
-
-
-      // setup for new output and pick any remainder.
-      if (pendingOutput) {
-        allocateOutgoing();
-        pendingOutput = false;
-        if (EXTRA_DEBUG) {
-          logger.debug("Attempting to output remainder.");
-        }
-        outputToBatch( previousIndex);
-      }
-
       if (newSchema) {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
@@ -167,18 +139,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
             if (EXTRA_DEBUG) {
               logger.debug("Values were different, outputting previous batch.");
             }
-            outputToBatch(previousIndex);
-            if (EXTRA_DEBUG) {
-              logger.debug("Output successful.");
+            if(!outputToBatch(previousIndex)) {
+              // There is still space in outgoing container, so proceed to the next input.
+              if (EXTRA_DEBUG) {
+                logger.debug("Output successful.");
+              }
+              addRecordInc(currentIndex);
+            } else {
+              if (EXTRA_DEBUG) {
+                logger.debug("Output container has reached its capacity. Flushing it.");
+              }
+
+              // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls.
+              previousIndex = currentIndex;
+              incIndex();
+              return setOkAndReturn();
             }
-            addRecordInc(currentIndex);
           }
           previousIndex = currentIndex;
         }
 
-
         InternalBatch previous = null;
-
         try {
           while (true) {
             if (previous != null) {
@@ -196,7 +177,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               if (first && addedRecordCount == 0) {
                 return setOkAndReturn();
               } else if(addedRecordCount > 0) {
-                outputToBatchPrev( previous, previousIndex, outputCount);
+                outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+                // (output container full or not) as we are not going to insert anymore records.
                 if (EXTRA_DEBUG) {
                   logger.debug("Received no more batches, returning.");
                 }
@@ -218,7 +200,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
               }
               if (addedRecordCount > 0) {
-                outputToBatchPrev( previous, previousIndex, outputCount);
+                outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+                // (output container full or not) as we are not going to insert anymore records.
                 if (EXTRA_DEBUG) {
                   logger.debug("Wrote out end of previous batch, returning.");
                 }
@@ -249,7 +232,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   }
                   previousIndex = currentIndex;
                   if (addedRecordCount > 0) {
-                    outputToBatchPrev( previous, previousIndex, outputCount);
+                    if (!outputToBatchPrev(previous, previousIndex, outputCount)) {
+                      if (EXTRA_DEBUG) {
+                        logger.debug("Output container is full. flushing it.");
+                        return setOkAndReturn();
+                      }
+                    }
                     continue outside;
                   }
                 }
@@ -263,8 +251,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
         }
         } finally {
-          // make sure to clear previous if we haven't saved it.
-          if (remainderBatch == null && previous != null) {
+          // make sure to clear previous
+          if (previous != null) {
             previous.clear();
           }
         }
@@ -303,7 +291,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     return AggOutcome.RETURN_OUTCOME;
   }
 
-  private final void outputToBatch(int inIndex) {
+  // Returns output container status after insertion of the given record. Caller must check the return value if it
+  // plans to insert more records into outgoing container.
+  private final boolean outputToBatch(int inIndex) {
+    assert outputCount < OUTPUT_BATCH_SIZE:
+        "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
 
     outputRecordKeys(inIndex, outputCount);
 
@@ -315,15 +307,24 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
+
+    return outputCount == OUTPUT_BATCH_SIZE;
   }
 
-  private final void outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+  // Returns output container status after insertion of the given record. Caller must check the return value if it
+  // plans to inserts more record into outgoing container.
+  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+    assert outputCount < OUTPUT_BATCH_SIZE:
+        "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
+
     outputRecordKeysPrev(b1, inIndex, outIndex);
     outputRecordValues(outIndex);
     resetValues();
     resetValues();
     outputCount++;
     addedRecordCount = 0;
+
+    return outputCount == OUTPUT_BATCH_SIZE;
   }
 
   private void addRecordInc(int index) {
@@ -333,9 +334,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
   @Override
   public void cleanup() {
-    if (remainderBatch != null) {
-      remainderBatch.clear();
-    }
   }
 
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 22fa047..c255033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -43,6 +43,8 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.work.ErrorHelper;
 
+import com.google.common.collect.ArrayListMultimap;
+
 /**
  * Broadcast Sender broadcasts incoming batches to all receivers (one or more).
  * This is useful in cases such as broadcast join where sending the entire table to join
@@ -52,6 +54,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
   private final FragmentContext context;
   private final BroadcastSender config;
+
+  private final int[][] receivingMinorFragments;
   private final DataTunnel[] tunnels;
   private final ExecProtos.FragmentHandle handle;
   private volatile boolean ok;
@@ -70,20 +74,40 @@ public class BroadcastSenderRootExec extends BaseRootExec {
                                  RecordBatch incoming,
                                  BroadcastSender config) throws OutOfMemoryException {
     super(context, new OperatorContext(config, context, null, false), config);
-    //super(context, config);
     this.ok = true;
     this.context = context;
     this.incoming = incoming;
     this.config = config;
     this.handle = context.getHandle();
     List<DrillbitEndpoint> destinations = config.getDestinations();
-    this.tunnels = new DataTunnel[destinations.size()];
+    ArrayListMultimap<DrillbitEndpoint, Integer> dests = ArrayListMultimap.create();
+
     for(int i = 0; i < destinations.size(); ++i) {
-      FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
-      tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
+      dests.put(destinations.get(i), i);
     }
+
+    int destCount = dests.keySet().size();
+    int i = 0;
+
+    this.tunnels = new DataTunnel[destCount];
+    this.receivingMinorFragments = new int[destCount][];
+    for(DrillbitEndpoint ep : dests.keySet()){
+      List<Integer> minorsList= dests.get(ep);
+      int[] minorsArray = new int[minorsList.size()];
+      int x = 0;
+      for(Integer m : minorsList){
+        minorsArray[x++] = m;
+      }
+      receivingMinorFragments[i] = minorsArray;
+      tunnels[i] = context.getDataTunnel(ep);
+      i++;
+    }
+
+
   }
 
+
+
   @Override
   public boolean innerNext() {
     if(!ok) {
@@ -97,14 +121,15 @@ public class BroadcastSenderRootExec extends BaseRootExec {
       case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
-          FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
+          FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i]);
           stats.startWait();
           try {
             tunnels[i].sendRecordBatch(this.statusHandler, b2);
+            statusHandler.sendCount.increment();
           } finally {
             stats.stopWait();
           }
-          statusHandler.sendCount.increment();
+
         }
 
         return false;
@@ -116,15 +141,15 @@ public class BroadcastSenderRootExec extends BaseRootExec {
           writableBatch.retainBuffers(tunnels.length - 1);
         }
         for (int i = 0; i < tunnels.length; ++i) {
-          FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
+          FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i], writableBatch);
           updateStats(batch);
           stats.startWait();
           try {
             tunnels[i].sendRecordBatch(this.statusHandler, batch);
+            statusHandler.sendCount.increment();
           } finally {
             stats.stopWait();
           }
-          statusHandler.sendCount.increment();
         }
 
         return ok;
@@ -140,29 +165,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
   }
 
-  /*
-  private boolean waitAllFutures(boolean haltOnError) {
-    for (DrillRpcFuture<?> responseFuture : responseFutures) {
-      try {
-        GeneralRPCProtos.Ack ack = (GeneralRPCProtos.Ack) responseFuture.checkedGet();
-        if(!ack.getOk()) {
-          ok = false;
-          if (haltOnError) {
-            return false;
-          }
-        }
-      } catch (RpcException e) {
-        logger.error("Error sending batch to receiver: " + e);
-        ok = false;
-        if (haltOnError) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-*/
-
   @Override
   public void stop() {
       ok = false;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8dfc8f1..299fd3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -29,7 +29,9 @@ import org.eigenbase.rel.JoinRelType;
  * The status of the current join.  Maintained outside the individually compiled join templates so that we can carry status across multiple schemas.
  */
 public final class JoinStatus {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+
+  private static final int OUTPUT_BATCH_SIZE = 32*1024;
 
   public static enum RightSourceMode {
     INCOMING, SV4;
@@ -165,6 +167,10 @@ public final class JoinStatus {
     outputPosition = 0;
   }
 
+  public final boolean isOutgoingBatchFull() {
+    return outputPosition == OUTPUT_BATCH_SIZE;
+  }
+
   public final void incOutputPos() {
     outputPosition++;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 4124e6d..7da9788 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -75,8 +75,6 @@ import org.eigenbase.rel.JoinRelType;
  */
 public abstract class JoinTemplate implements JoinWorker {
 
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
-
   @Override
   public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
     doSetup(context, status, outgoing);
@@ -88,7 +86,7 @@ public abstract class JoinTemplate implements JoinWorker {
    * @return  true of join succeeded; false if the worker needs to be regenerated
    */
   public final boolean doJoin(final JoinStatus status) {
-    for (int i = 0; i < OUTPUT_BATCH_SIZE; i++) {
+    while(!status.isOutgoingBatchFull()) {
       // for each record
 
       // validate input iterators (advancing to the next record batch if necessary)
@@ -96,6 +94,9 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining values from the left batch
           while (status.isLeftPositionAllowed()) {
+            if (status.isOutgoingBatchFull()) {
+              return false;
+            }
             doCopyLeft(status.getLeftPosition(), status.getOutPosition());
 
             status.incOutputPos();
@@ -139,6 +140,9 @@ public abstract class JoinTemplate implements JoinWorker {
         boolean crossedBatchBoundaries = false;
         int initialRightPosition = status.getRightPosition();
         do {
+          if (status.isOutgoingBatchFull()) {
+            return false;
+          }
           // copy all equal right keys to the output record batch
           doCopyLeft(status.getLeftPosition(), status.getOutPosition());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index a23bd7a..200e78e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -271,8 +271,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     int fieldId = 0;
     StatusHandler statusHandler = new StatusHandler(sendCount, context);
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
-      FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
-      DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
+      DataTunnel tunnel = context.getDataTunnel(endpoint);
       FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
           isLast,
           handle.getQueryId(),

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 71ffd41..79076cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -97,7 +97,7 @@ public abstract class PartitionerTemplate implements Partitioner {
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
       outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
-          context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId, statusHandler));
+          context.getDataTunnel(endpoint), context, oContext.getAllocator(), fieldId, statusHandler));
       fieldId++;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 707c41c..e559ece 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.impl.sort;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
@@ -63,7 +65,8 @@ public class SortRecordBatchBuilder {
   }
 
   /**
-   * Add another record batch to the set of record batches.
+   * Add another record batch to the set of record batches. TODO: Refactor this and other {@link #add
+   * (RecordBatchData)} method into one method.
    * @param batch
    * @return True if the requested add completed successfully.  Returns false in the case that this builder is full and cannot receive additional packages.
    * @throws SchemaChangeException
@@ -98,19 +101,28 @@ public class SortRecordBatchBuilder {
     return true;
   }
 
-  public boolean add(RecordBatchData rbd) {
+  public void add(RecordBatchData rbd) {
     long batchBytes = getSize(rbd.getContainer());
     if (batchBytes == 0 && batches.size() > 0) {
-      return true;
+      return;
     }
+
     if(batchBytes + runningBytes > maxBytes) {
-      return false; // enough data memory.
+      final String errMsg = String.format("Adding this batch causes the total size to exceed max allowed size. " +
+          "Current runningBytes %d, Incoming batchBytes %d. maxBytes %d", runningBytes, batchBytes, maxBytes);
+      logger.error(errMsg);
+      throw new DrillRuntimeException(errMsg);
     }
-    if(runningBatches+1 > Character.MAX_VALUE) {
-      return false; // allowed in batch.
+    if(runningBatches >= Character.MAX_VALUE) {
+      final String errMsg = String.format("Tried to add more than %d number of batches.", Character.MAX_VALUE);
+      logger.error(errMsg);
+      throw new DrillRuntimeException(errMsg);
     }
     if(!svAllocator.preAllocate(rbd.getRecordCount()*4)) {
-      return false;  // sv allocation available.
+      final String errMsg = String.format("Failed to pre-allocate memory for SV. " + "Existing recordCount*4 = %d, " +
+          "incoming batch recordCount*4 = %d", recordCount * 4, rbd.getRecordCount() * 4);
+      logger.error(errMsg);
+      throw new DrillRuntimeException(errMsg);
     }
 
 
@@ -120,12 +132,11 @@ public class SortRecordBatchBuilder {
       if (sv2 != null) {
         sv2.clear();
       }
-      return true;
+      return;
     }
     runningBytes += batchBytes;
     batches.put(rbd.getContainer().getSchema(), rbd);
     recordCount += rbd.getRecordCount();
-    return true;
   }
 
   public void canonicalize() {
@@ -149,7 +160,12 @@ public class SortRecordBatchBuilder {
     if (batches.keys().size() < 1) {
       assert false : "Invalid to have an empty set of batches with no schemas.";
     }
-    sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+
+    final DrillBuf svBuffer = svAllocator.getAllocation();
+    if (svBuffer == null) {
+      throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
+    }
+    sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
 
@@ -225,4 +241,15 @@ public class SortRecordBatchBuilder {
     return containerList;
   }
 
+  /**
+   * For given recordcount how muchmemory does SortRecordBatchBuilder needs for its own purpose. This is used in
+   * ExternalSortBatch to make decisions about whether to spill or not.
+   *
+   * @param recordCount
+   * @return
+   */
+  public static long memoryNeeded(int recordCount) {
+    // We need 4 bytes (SV4) for each record.
+    return recordCount * 4;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index f320bbb..c03d6a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -65,7 +65,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterators;
@@ -77,27 +76,24 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
 
   private static final long MAX_SORT_BYTES = 1L * 1024 * 1024 * 1024;
-  public static int SPILL_TARGET_RECORD_COUNT;
-  public static int TARGET_RECORD_COUNT;
-  public static int SPILL_BATCH_GROUP_SIZE;
-  public static int SPILL_THRESHOLD;
-  public static List<String> SPILL_DIRECTORIES;
-  private Iterator<String> dirs;
-
-  public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
-  public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
-
-
+  private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+  private static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
+
+  private final int SPILL_BATCH_GROUP_SIZE;
+  private final int SPILL_THRESHOLD;
+  private final List<String> SPILL_DIRECTORIES;
+  private final Iterator<String> dirs;
   private final RecordBatch incoming;
+  private final BufferAllocator copierAllocator;
+
   private BatchSchema schema;
   private SingleBatchSorter sorter;
   private SortRecordBatchBuilder builder;
   private MSorter mSorter;
   private PriorityQueueCopier copier;
-  private BufferAllocator copierAllocator;
   private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
   private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
   private SelectionVector4 sv4;
@@ -105,12 +101,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private int spillCount = 0;
   private int batchesSinceLastSpill = 0;
   private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
-  private boolean useIncomingSchema = false;
   private boolean first = true;
   private long totalSizeInMemory = 0;
   private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
-  private boolean stop = false;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, true);
@@ -123,54 +117,29 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    SPILL_TARGET_RECORD_COUNT = config.getInt(ExecConstants.EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE);
-    TARGET_RECORD_COUNT = config.getInt(ExecConstants.EXTERNAL_SORT_TARGET_BATCH_SIZE);
     SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
     SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
     SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
     dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
     uid = System.nanoTime();
-    copierAllocator = oContext.getAllocator().getChildAllocator(context, 10000000, 20000000, true);
+    copierAllocator = oContext.getAllocator().getChildAllocator(
+        context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
   }
 
   @Override
   public int getRecordCount() {
     if (sv4 != null) {
       return sv4.getCount();
-    } else {
-      return container.getRecordCount();
     }
-  }
-
-  @Override
-  public void kill(boolean sendUpstream) {
-    incoming.kill(sendUpstream);
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
+    return container.getRecordCount();
   }
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return this.sv4;
+    return sv4;
   }
 
   @Override
-  public BatchSchema getSchema() {
-    if (useIncomingSchema) {
-      List<MaterializedField> fields = Lists.newArrayList();
-      for (MaterializedField field : incoming.getSchema()) {
-        fields.add(field);
-      }
-      return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
-    }
-    return super.getSchema();
-  }
-
-
-  @Override
   public void cleanup() {
     if (batchGroups != null) {
       for (BatchGroup group: batchGroups) {
@@ -203,16 +172,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         for (VectorWrapper w : incoming) {
           ValueVector v = container.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
-            w.getValueVector().makeTransferPair(v);
+            w.getValueVector().makeTransferPair(v); // Can we remove this hack?
             v.clear();
           }
-          v.allocateNew();
+          v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
         }
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(0);
         return;
       case STOP:
-        stop = true;
       case NONE:
         state = BatchState.DONE;
       default:
@@ -224,13 +192,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public IterOutcome innerNext() {
     if (schema != null) {
       if (spillCount == 0) {
-        if (schema != null) {
-          if (getSelectionVector4().next()) {
-            return IterOutcome.OK;
-          } else {
-            return IterOutcome.NONE;
-          }
-        }
+        return (getSelectionVector4().next()) ? IterOutcome.OK : IterOutcome.NONE;
       } else {
         Stopwatch w = new Stopwatch();
         w.start();
@@ -247,7 +209,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       }
     }
 
-    long totalcount = 0;
+    int totalCount = 0;
 
     try{
       container.clear();
@@ -309,7 +271,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             }
           }
           int count = sv2.getCount();
-          totalcount += count;
+          totalCount += count;
 //          if (count == 0) {
 //            break outer;
 //          }
@@ -324,10 +286,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
           batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
           batchesSinceLastSpill++;
-          if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
-                  (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
-                  (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
-                  (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+          if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
+              (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+              // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+              (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
+              // current memory used is more than 95% of memory usage limit of this operator
+              (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+              // current memory used is more than 95% of memory usage limit of this fragment
+              (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
+              // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
+              // since the last spill exceed the defined limit
+              (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+
             mergeAndSpill();
             batchesSinceLastSpill = 0;
           }
@@ -346,7 +316,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         }
       }
 
-      if (totalcount == 0) {
+      if (totalCount == 0) {
         return IterOutcome.NONE;
       }
       if (spillCount == 0) {
@@ -378,10 +348,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
         VectorContainer hyperBatch = constructHyperBatch(batchGroups);
         createCopier(hyperBatch, batchGroups, container);
-        int inMemoryRecordCount = 0;
-        for (BatchGroup g : batchGroups) {
-          inMemoryRecordCount += g.getRecordCount();
-        }
+
         int estimatedRecordSize = 0;
         for (VectorWrapper w : batchGroups.get(0)) {
           try {
@@ -390,7 +357,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             estimatedRecordSize += 50;
           }
         }
-        targetRecordCount = (int) Math.max(1, 250 * 1000 / estimatedRecordSize);
+        targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, 250 * 1000 / estimatedRecordSize));
         int count = copier.next(targetRecordCount);
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(count);
@@ -408,6 +375,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
   }
 
+  private boolean hasMemoryForInMemorySort(int currentRecordCount) {
+    long currentlyAvailable =  popConfig.getMaxAllocation() - oContext.getAllocator().getAllocatedMemory();
+
+    long neededForInMemorySort = SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
+        MSortTemplate.memoryNeeded(currentRecordCount);
+
+    return currentlyAvailable > neededForInMemorySort;
+  }
+
   public void mergeAndSpill() throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
     VectorContainer outputContainer = new VectorContainer();

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 3fd744f..94bc3a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Queues;
 public abstract class MSortTemplate implements MSorter, IndexedSortable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
-  private BufferAllocator allocator;
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
   private long compares;
@@ -46,7 +45,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
 
   @Override
   public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
-    this.allocator = allocator;
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
     this.vector4 = vector4.createNewWrapperCurrent();
@@ -70,6 +68,18 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
     aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
   }
 
+  /**
+   * For given recordCount how much memory does MSorter needs for its own purpose. This is used in
+   * ExternalSortBatch to make decisions about whether to spill or not.
+   *
+   * @param recordCount
+   * @return
+   */
+  public static long memoryNeeded(int recordCount) {
+    // We need 4 bytes (SV4) for each record.
+    return recordCount * 4;
+  }
+
   private int merge(int leftStart, int rightStart, int rightEnd, int outStart) {
     int l = leftStart;
     int r = rightStart;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index d427744..161ca6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -26,6 +26,9 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 
 public interface PriorityQueueCopier {
+  public static long initialAllocation = 10000000;
+  public static long maxAllocation = 20000000;
+
   public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
                     VectorAccessible outgoing) throws SchemaChangeException;
   public int next(int targetRecordCount);

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index fe67064..f7786b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -30,23 +30,18 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.AllocationHelper;
 
 public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
 
   private SelectionVector4 vector4;
   private List<BatchGroup> batchGroups;
   private VectorAccessible hyperBatch;
-  private FragmentContext context;
-  private BufferAllocator allocator;
   private VectorAccessible outgoing;
   private int size;
   private int queueSize = 0;
-  private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
 
   @Override
   public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
                     VectorAccessible outgoing) throws SchemaChangeException {
-    this.context = context;
-    this.allocator = allocator;
     this.hyperBatch = hyperBatch;
     this.batchGroups = batchGroups;
     this.outgoing = outgoing;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 9c59a9e..496bc9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -162,11 +162,20 @@ public class DrillRuleSets {
     }
 
     if (ps.isHashJoinEnabled()) {
-      ruleList.add(HashJoinPrule.INSTANCE);
+      ruleList.add(HashJoinPrule.DIST_INSTANCE);
+
+      if(ps.isBroadcastJoinEnabled()){
+        ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+      }
     }
 
     if (ps.isMergeJoinEnabled()) {
-      ruleList.add(MergeJoinPrule.INSTANCE);
+      ruleList.add(MergeJoinPrule.DIST_INSTANCE);
+
+      if(ps.isBroadcastJoinEnabled()){
+        ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
+      }
+
     }
 
     return new DrillRuleSet(ImmutableSet.copyOf(ruleList));

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index 0c76de4..0467a07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -52,11 +52,14 @@ public class BroadcastExchangePrel extends ExchangePrel{
 
     RelNode child = this.getChild();
 
-    double inputRows = RelMetadataQuery.getRowCount(child);
-    int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
-    double cpuCost = DrillCostBase.SVR_CPU_COST * inputRows ;
-    int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
-    double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+    final int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
+    final double broadcastFactor = PrelUtil.getSettings(getCluster()).getBroadcastFactor();
+    final double inputRows = RelMetadataQuery.getRowCount(child);
+
+    final int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
+    final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows ;
+    final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+
     return new DrillCostBase(inputRows, cpuCost, 0, networkCost);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index 6a1dbb7..ae079a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitDef;
+import org.eigenbase.relopt.volcano.RelSubset;
 
 public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrait>{
   public static final DrillDistributionTraitDef INSTANCE = new DrillDistributionTraitDef();
@@ -67,7 +68,7 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
     // Source trait is "ANY", which is abstract type of distribution.
     // We do not want to convert from "ANY", since it's abstract.
     // Source trait should be concrete type: SINGLETON, HASH_DISTRIBUTED, etc.
-    if (currentDist.equals(DrillDistributionTrait.DEFAULT)) {
+    if (currentDist.equals(DrillDistributionTrait.DEFAULT) && !(rel instanceof RelSubset) ) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 433405a..e802a40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -25,15 +25,19 @@ import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
 import org.eigenbase.trace.EigenbaseTrace;
 
 public class HashJoinPrule extends JoinPruleBase {
-  public static final RelOptRule INSTANCE = new HashJoinPrule();
+  public static final RelOptRule DIST_INSTANCE = new HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
+  public static final RelOptRule BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+
   protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
 
-  private HashJoinPrule() {
-    super(
-        RelOptHelper.any(DrillJoinRel.class), "Prel.HashJoinPrule");
+  private final boolean isDist;
+  private HashJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
+    super(operand, name);
+    this.isDist = isDist;
   }
 
   @Override
@@ -60,14 +64,15 @@ public class HashJoinPrule extends JoinPruleBase {
 
     try {
 
-      createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
-
-      if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
-        createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
-
-        // createBroadcastPlan1(call, join, PhysicalJoinType.HASH_JOIN, left, right, null, null);
+      if(isDist){
+        createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
+      }else{
+        if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+          createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
+        }
       }
 
+
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index afcbf71..77c055c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelType;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.metadata.RelMetadataQuery;
@@ -73,13 +74,11 @@ public abstract class JoinPruleBase extends Prule {
   }
 
   protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {
-    if (! PrelUtil.getPlannerSettings(planner).isBroadcastJoinEnabled()) {
-      return false;
-    }
 
     double estimatedRightRowCount = RelMetadataQuery.getRowCount(right);
     if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
         && ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
+        && (join.getJoinType() == JoinRelType.INNER || join.getJoinType() == JoinRelType.LEFT)
         ) {
       return true;
     }
@@ -175,43 +174,69 @@ public abstract class JoinPruleBase extends Prule {
 
     DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
+    RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+
     if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
       assert collationLeft != null && collationRight != null;
+      traitsLeft = traitsLeft.plus(collationLeft);
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
     } else {
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
     }
 
-    final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     final RelNode convertedLeft = convert(left, traitsLeft);
     final RelNode convertedRight = convert(right, traitsRight);
 
-    new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
-
-      @Override
-      public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
-        DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-        RelTraitSet newTraitsLeft;
-        if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
-        } else {
-          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
-        }
-        Character.digit(1, 1);
-        RelNode newLeft = convert(left, newTraitsLeft);
-        if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-          return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
-                                     join.getJoinType());
-        } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-          return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
-                                      join.getJoinType());
-        } else{
-          return null;
-        }
+    boolean traitProp = false;
+
+    if(traitProp){
+      if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+
+          @Override
+          public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+            DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+            RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
+
+            RelNode newLeft = convert(left, newTraitsLeft);
+              return new MergeJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+                                          join.getJoinType());
+          }
+
+        }.go(join, convertedLeft);
+
+
+      }else{
+
 
+        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+
+          @Override
+          public RelNode convertChild(final DrillJoinRel join,  final RelNode rel) throws InvalidRelException {
+            DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+            RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+            RelNode newLeft = convert(left, newTraitsLeft);
+            return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+                                         join.getJoinType());
+
+          }
+
+        }.go(join, convertedLeft);
       }
 
-    }.go(join, convertedLeft);
+    }else{
+      if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+        call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
+            join.getJoinType()));
+
+      }else{
+        call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
+                                       join.getJoinType()));
+      }
+    }
+
+
 
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index b7e86e3..fac18c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -116,4 +116,5 @@ public class MergeJoinPrel  extends JoinPrel {
     return SelectionVectorMode.NONE;
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index faffa63..5283467 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -29,18 +29,21 @@ import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.Lists;
 
 public class MergeJoinPrule extends JoinPruleBase {
-  public static final RelOptRule INSTANCE = new MergeJoinPrule();
+  public static final RelOptRule DIST_INSTANCE = new MergeJoinPrule("Prel.MergeJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
+  public static final RelOptRule BROADCAST_INSTANCE = new MergeJoinPrule("Prel.MergeJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+
   protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
 
-  private MergeJoinPrule() {
-    super(
-        RelOptHelper.any(DrillJoinRel.class),
-        "Prel.MergeJoinPrule");
+  final boolean isDist;
+  private MergeJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
+    super(operand, name);
+    this.isDist = isDist;
   }
 
   @Override
@@ -64,10 +67,12 @@ public class MergeJoinPrule extends JoinPruleBase {
       RelCollation collationLeft = getCollation(join.getLeftKeys());
       RelCollation collationRight = getCollation(join.getRightKeys());
 
-      createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
-
-      if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
-        createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight);
+      if(isDist){
+        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
+      }else{
+        if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+          createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight);
+        }
       }
 
     } catch (InvalidRelException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index bbfbbcb..ede0683 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -44,7 +44,8 @@ public class PlannerSettings implements Context{
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
   public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
   public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
-  public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000);
+  public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000000);
+  public static final OptionValidator BROADCAST_FACTOR = new RangeDoubleValidator("planner.broadcast_factor", 0, Double.MAX_VALUE, 1.0d);
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d);
   public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
   public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
@@ -80,6 +81,10 @@ public class PlannerSettings implements Context{
     return options.getOption(JOIN_ROW_COUNT_ESTIMATE_FACTOR.getOptionName()).float_val;
   }
 
+  public double getBroadcastFactor(){
+    return options.getOption(BROADCAST_FACTOR.getOptionName()).float_val;
+  }
+
   public boolean useDefaultCosting() {
     return useDefaultCosting;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 72034ed..b1d5a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -82,7 +82,7 @@ public class ProjectPrule extends Prule {
 
       DrillDistributionTrait newDist = convertDist(childDist, inToOut);
       RelCollation newCollation = convertRelCollation(childCollation, inToOut);
-      RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation);
+      RelTraitSet newProjectTraits = newTraitSet(Prel.DRILL_PHYSICAL, newDist, newCollation);
       return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(), project.getRowType());
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 89b133a..929cb6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -54,7 +54,7 @@ public class StreamAggPrule extends AggPruleBase {
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
-    final RelNode input = aggregate.getChild();
+    RelNode input = aggregate.getChild();
     final RelCollation collation = getCollation(aggregate);
     RelTraitSet traits = null;
 
@@ -78,7 +78,7 @@ public class StreamAggPrule extends AggPruleBase {
             public RelNode convertChild(final DrillAggregateRel join, final RelNode rel) throws InvalidRelException {
               DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
               RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
-              RelNode newInput = convert(input, traits);
+              RelNode newInput = convert(rel, traits);
 
               StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                   aggregate.getGroupSet(),
@@ -130,7 +130,7 @@ public class StreamAggPrule extends AggPruleBase {
             public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode rel) throws InvalidRelException {
               DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
               RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist);
-              RelNode newInput = convert(input, traits);
+              RelNode newInput = convert(rel, traits);
 
               StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                   aggregate.getGroupSet(),

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
index d4cd21f..72e06d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.planner.physical;
 
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.ConventionTraitDef;
+import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTrait;
 import org.eigenbase.relopt.RelTraitSet;
@@ -49,21 +51,25 @@ public abstract class SubsetTransformer<T extends RelNode, E extends Exception>
     }
 
     boolean transform = false;
-
     for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
-      if (!isDefaultDist(rel)) {
-        RelNode out = convertChild(n, rel);
+      if (isPhysical(rel)) {
+        RelNode newRel = RelOptRule.convert(candidateSet, rel.getTraitSet().plus(Prel.DRILL_PHYSICAL));
+        RelNode out = convertChild(n, newRel);
         if (out != null) {
           call.transformTo(out);
           transform = true;
-
         }
       }
     }
 
+
     return transform;
   }
 
+  private boolean isPhysical(RelNode n){
+    return n.getTraitSet().getTrait(ConventionTraitDef.INSTANCE).equals(Prel.DRILL_PHYSICAL);
+  }
+
   private boolean isDefaultDist(RelNode n) {
     return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index bb12a22..1aadaa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -18,6 +18,8 @@
 
 package org.apache.drill.exec.proto.helper;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -39,4 +41,10 @@ public class QueryIdHelper {
   public static String getQueryIdentifier(FragmentHandle h) {
     return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
   }
+
+  public static String getQueryIdentifiers(QueryId queryId, int majorFragmentId, List<Integer> minorFragmentIds) {
+    String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
+    return getQueryId(queryId) + ":" + majorFragmentId + ":" + fragmentIds;
+  }
+
 }