You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/02 18:51:22 UTC

[1/6] git commit: DRILL-864: MergeJoinBatch fails to set record count in ValueVectors in container

Repository: incubator-drill
Updated Branches:
  refs/heads/master e1e5ea0ed -> 8490d7433


DRILL-864: MergeJoinBatch fails to set record count in ValueVectors in container


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/623a52e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/623a52e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/623a52e1

Branch: refs/heads/master
Commit: 623a52e11e94cdfeac1bb734890ec7155cc760d5
Parents: e1e5ea0
Author: vkorukanti <ve...@gmail.com>
Authored: Thu May 29 17:23:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:11:39 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/join/MergeJoinBatch.java    | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/623a52e1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 121cfec..b284454 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -181,18 +182,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         case BATCH_RETURNED:
           // only return new schema if new worker has been setup.
           logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
+          setRecordCountInContainer();
           return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
         case FAILURE:
           kill();
           return IterOutcome.STOP;
         case NO_MORE_DATA:
           logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+          setRecordCountInContainer();
           return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
         case SCHEMA_CHANGED:
           worker = null;
           if(status.getOutPosition() > 0){
             // if we have current data, let's return that.
             logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
+            setRecordCountInContainer();
             return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
           }else{
             // loop again to rebuild worker.
@@ -210,6 +214,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     }
   }
 
+  private void setRecordCountInContainer() {
+    for(VectorWrapper vw : container){
+      Preconditions.checkArgument(!vw.isHyper());
+      vw.getValueVector().getMutator().setValueCount(getRecordCount());
+    }
+  }
+
   public void resetBatchBuilder() {
     batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
   }


[3/6] git commit: DRILL-874 : Fix code to handle splitAndTransferTo in BitVector, when copy is required.

Posted by ja...@apache.org.
DRILL-874 : Fix code to handle splitAndTransferTo in BitVector, when copy is required.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d52e3251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d52e3251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d52e3251

Branch: refs/heads/master
Commit: d52e3251ba30edb7a5ab2fcc0e876fa8e1125bfd
Parents: 760cbd4
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Sun Jun 1 11:06:29 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:39 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/vector/BitVector.java | 29 +++++++++++---------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d52e3251/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 14324b0..e217ddb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -60,11 +60,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   private int getSizeFromCount(int valueCount) {
-    return (int) Math.ceil((float)valueCount / 8.0);
+    return (int) Math.ceil(valueCount / 8.0);
   }
 
   private int getByteIndex(int index) {
-    return (int) Math.floor((float) index / 8.0);
+    return (int) Math.floor(index / 8.0);
   }
 
   public void allocateNew() {
@@ -174,27 +174,30 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   public void splitAndTransferTo(int startIndex, int length, BitVector target) {
     assert startIndex + length <= valueCount;
     int firstByte = getByteIndex(startIndex);
-    int lastByte = getSizeFromCount(startIndex + length) - 1;
+    int byteSize = getSizeFromCount(length);
     int offset = startIndex % 8;
     if (offset == 0) {
       // slice
-      target.data = this.data.slice(firstByte, lastByte - firstByte + 1);
+      target.data = this.data.slice(firstByte, byteSize);
       target.data.retain();
     } else {
       // Copy data
+      // When the first bit starts from the middle of a byte (offset != 0), copy data from src BitVector.
+      // Each byte in the target is composed by a part in i-th byte, another part in (i+1)-th byte.
+      // The last byte copied to target is a bit tricky :
+      //   1) if length requires partly byte ( length % 8 !=0), copy the remaining bits only.
+      //   2) otherwise, copy the last byte in the same way as to the prior bytes.
       target.clear();
       target.allocateNew(length);
-      if ((startIndex + length) % 8 == 0) {
-        lastByte++;
-      }
-      int i = firstByte;
       // TODO maybe do this one word at a time, rather than byte?
-      for (; i <= lastByte - 1; i++) {
-        target.data.setByte(i - firstByte, (((this.data.getByte(i) & 0xFF) >>> offset) + (this.data.getByte(i + 1) <<  (8 - offset))));
-      }
-      if (startIndex + length == this.valueCount) {
-        target.data.setByte(i - firstByte, ((this.data.getByte(lastByte) & 0xFF) >>> offset));
+      for (int i = 0; i < byteSize - 1; i++) {
+        target.data.setByte(i, (((this.data.getByte(firstByte + i) & 0xFF) >>> offset) + (this.data.getByte(firstByte + i + 1) <<  (8 - offset))));
       }
+      if (length % 8 != 0)
+        target.data.setByte(byteSize - 1, ((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset));
+      else
+        target.data.setByte(byteSize - 1,
+            (((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset) + (this.data.getByte(firstByte + byteSize) <<  (8 - offset))));
     }
   }
 


[6/6] git commit: DRILL-672: Queries against HBase table do not close after the data is returned.

Posted by ja...@apache.org.
DRILL-672: Queries against HBase table do not close after the data is returned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8490d743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8490d743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8490d743

Branch: refs/heads/master
Commit: 8490d7433f9d9171971ae6e1af02cb67215cd8ce
Parents: d929faa
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Wed May 28 04:57:07 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:15:29 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  | 215 ++++++++++---
 .../exec/store/hbase/HBaseRecordReader.java     |   4 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |  14 +-
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  17 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   8 +-
 .../hbase/TestHBaseRegionScanAssignments.java   | 299 +++++++++++++++++++
 6 files changed, 505 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 809aa86..f3ff64c 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -18,12 +18,18 @@
 package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -38,12 +44,12 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
@@ -51,16 +57,26 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
 
+  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
+    @Override
+    public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
+      return list1.size() - list2.size();
+    }
+  };
+
+  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
   private HBaseStoragePluginConfig storagePluginConfig;
 
   private List<SchemaPath> columns;
@@ -70,9 +86,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   private HBaseStoragePlugin storagePlugin;
 
   private Stopwatch watch = new Stopwatch();
-  private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings;
-  private List<EndpointAffinity> endpointAffinities;
-  private NavigableMap<HRegionInfo,ServerName> regionsToScan;
+
+  private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
+
+  private NavigableMap<HRegionInfo, ServerName> regionsToScan;
+
   private HTableDescriptor hTableDesc;
 
   private boolean filterPushedDown = false;
@@ -85,9 +103,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
   }
 
-  public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
-    this.storagePlugin = storageEngine;
-    this.storagePluginConfig = storageEngine.getConfig();
+  public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+    this.storagePlugin = storagePlugin;
+    this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
     this.columns = columns;
     init();
@@ -95,13 +113,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
 
   /**
    * Private constructor, used for cloning.
-   * @param that The
+   * @param that The HBaseGroupScan to clone
    */
   private HBaseGroupScan(HBaseGroupScan that) {
     this.columns = that.columns;
-    this.endpointAffinities = that.endpointAffinities;
     this.hbaseScanSpec = that.hbaseScanSpec;
-    this.mappings = that.mappings;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
     this.regionsToScan = that.regionsToScan;
     this.storagePlugin = that.storagePlugin;
     this.storagePluginConfig = that.storagePluginConfig;
@@ -165,8 +182,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
 
     Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
     for (ServerName sn : regionsToScan.values()) {
-      String host = sn.getHostname();
-      DrillbitEndpoint ep = endpointMap.get(host);
+      DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
       if (ep != null) {
         EndpointAffinity affinity = affinityMap.get(ep);
         if (affinity == null) {
@@ -176,9 +192,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
         }
       }
     }
-    this.endpointAffinities = Lists.newArrayList(affinityMap.values());
-    logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
-    return this.endpointAffinities;
+    logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
+    return Lists.newArrayList(affinityMap.values());
   }
 
   /**
@@ -189,42 +204,135 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     watch.reset();
     watch.start();
-    Preconditions.checkArgument(incomingEndpoints.size() <= regionsToScan.size(),
-        String.format("Incoming endpoints %d is greater than number of row groups %d", incomingEndpoints.size(), regionsToScan.size()));
 
-    mappings = ArrayListMultimap.create();
-    ArrayListMultimap<String, Integer> incomingEndpointMap = ArrayListMultimap.create();
-    for (int i = 0; i < incomingEndpoints.size(); i++) {
-      incomingEndpointMap.put(incomingEndpoints.get(i).getAddress(), i);
+    final int numSlots = incomingEndpoints.size();
+    Preconditions.checkArgument(numSlots <= regionsToScan.size(),
+        String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
+
+    /*
+     * Minimum/Maximum number of assignment per slot
+     */
+    final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots);
+    final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots);
+
+    /*
+     * initialize (endpoint index => HBaseSubScanSpec list) map
+     */
+    endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+
+    /*
+     * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
+     */
+    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+    /*
+     * Initialize these two maps
+     */
+    for (int i = 0; i < numSlots; ++i) {
+      endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+      String hostname = incomingEndpoints.get(i).getAddress();
+      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+      if (hostIndexQueue == null) {
+        hostIndexQueue = Lists.newLinkedList();
+        endpointHostIndexListMap.put(hostname, hostIndexQueue);
+      }
+      hostIndexQueue.add(i);
     }
-    Map<String, Iterator<Integer>> mapIterator = new HashMap<String, Iterator<Integer>>();
-    for (String s : incomingEndpointMap.keySet()) {
-      Iterator<Integer> ints = Iterators.cycle(incomingEndpointMap.get(s));
-      mapIterator.put(s, ints);
+
+    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+
+    /*
+     * First, we assign regions which are hosted on region servers running on drillbit endpoints
+     */
+    for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+      Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next();
+      /*
+       * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
+       */
+      Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+      if (endpointIndexlist != null) {
+        Integer slotIndex = endpointIndexlist.poll();
+        List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+        endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+        // add to the tail of the slot list, to add more later in round robin fashion
+        endpointIndexlist.offer(slotIndex);
+        // this region has been assigned
+        regionsIterator.remove();
+      }
     }
-    Iterator<Integer> nullIterator = Iterators.cycle(incomingEndpointMap.values());
-    for (HRegionInfo regionInfo : regionsToScan.keySet()) {
-      logger.debug("creating read entry. start key: {} end key: {}", Bytes.toStringBinary(regionInfo.getStartKey()), Bytes.toStringBinary(regionInfo.getEndKey()));
-      HBaseSubScan.HBaseSubScanSpec p = new HBaseSubScan.HBaseSubScanSpec()
-          .setTableName(hbaseScanSpec.getTableName())
-          .setStartRow((hbaseScanSpec.getStartRow() != null && regionInfo.containsRow(hbaseScanSpec.getStartRow())) ? hbaseScanSpec.getStartRow() : regionInfo.getStartKey())
-          .setStopRow((hbaseScanSpec.getStopRow() != null && regionInfo.containsRow(hbaseScanSpec.getStopRow())) ? hbaseScanSpec.getStopRow() : regionInfo.getEndKey())
-          .setSerializedFilter(hbaseScanSpec.getSerializedFilter());
-      String host = regionsToScan.get(regionInfo).getHostname();
-      Iterator<Integer> indexIterator = mapIterator.get(host);
-      if (indexIterator == null) {
-        indexIterator = nullIterator;
+
+    /*
+     * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
+     */
+    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+    for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+      if (listOfScan.size() < minPerEndpointSlot) {
+        minHeap.offer(listOfScan);
+      } else if (listOfScan.size() > minPerEndpointSlot){
+        maxHeap.offer(listOfScan);
       }
-      mappings.put(indexIterator.next(), p);
     }
+
+    /*
+     * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
+     */
+    if (regionsToAssignSet.size() > 0) {
+      for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) {
+        List<HBaseSubScanSpec> smallestList = minHeap.poll();
+        smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+        if (smallestList.size() < minPerEndpointSlot) {
+          minHeap.offer(smallestList);
+        }
+      }
+    }
+
+    /*
+     * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
+     */
+    while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
+      List<HBaseSubScanSpec> smallestList = minHeap.poll();
+      List<HBaseSubScanSpec> largestList = maxHeap.poll();
+      smallestList.add(largestList.remove(largestList.size()-1));
+      if (largestList.size() > minPerEndpointSlot) {
+        maxHeap.offer(largestList);
+      }
+      if (smallestList.size() < minPerEndpointSlot) {
+        minHeap.offer(smallestList);
+      }
+    }
+
+    /* no slot should be empty at this point */
+    assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
+        "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
+        incomingEndpoints, endpointFragmentMapping.toString());
+
+    logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
+        watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+  }
+
+  private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
+    HBaseScanSpec spec = hbaseScanSpec;
+    return new HBaseSubScanSpec()
+        .setTableName(spec.getTableName())
+        .setRegionServer(regionsToScan.get(ri).getHostname())
+        .setStartRow((!isNullOrEmpty(spec.getStartRow()) && ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey())
+        .setStopRow((!isNullOrEmpty(spec.getStopRow()) && ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey())
+        .setSerializedFilter(spec.getSerializedFilter());
+  }
+
+  private boolean isNullOrEmpty(byte[] key) {
+    return key == null || key.length == 0;
   }
 
   @Override
   public HBaseSubScan getSpecificScan(int minorFragmentId) {
-    return new HBaseSubScan(storagePlugin, storagePluginConfig, mappings.get(minorFragmentId), columns);
+    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+        minorFragmentId);
+    return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
-
   @Override
   public int getMaxParallelizationWidth() {
     return regionsToScan.size();
@@ -313,4 +421,27 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     return filterPushedDown;
   }
 
+  /**
+   * Empty constructor, do not use, only for testing.
+   */
+  @VisibleForTesting
+  public HBaseGroupScan() { }
+
+  /**
+   * Do not use, only for testing.
+   */
+  @VisibleForTesting
+  public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) {
+    this.hbaseScanSpec = hbaseScanSpec;
+  }
+
+  /**
+   * Do not use, only for testing.
+   */
+  @JsonIgnore
+  @VisibleForTesting
+  public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> regionsToScan) {
+    this.regionsToScan = regionsToScan;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index caee8ed..204d486 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -30,8 +30,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -58,7 +56,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
-  
+
   private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index d9f2b7c..fafe9c9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -125,12 +125,14 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   public static class HBaseSubScanSpec {
 
     protected String tableName;
+    protected String regionServer;
     protected byte[] startRow;
     protected byte[] stopRow;
     protected byte[] serializedFilter;
 
     @parquet.org.codehaus.jackson.annotate.JsonCreator
     public HBaseSubScanSpec(@JsonProperty("tableName") String tableName,
+                            @JsonProperty("regionServer") String regionServer,
                             @JsonProperty("startRow") byte[] startRow,
                             @JsonProperty("stopRow") byte[] stopRow,
                             @JsonProperty("serializedFilter") byte[] serializedFilter,
@@ -139,6 +141,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
         throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
       }
       this.tableName = tableName;
+      this.regionServer = regionServer;
       this.startRow = startRow;
       this.stopRow = stopRow;
       if (serializedFilter != null) {
@@ -170,6 +173,15 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
       return this;
     }
 
+    public String getRegionServer() {
+      return regionServer;
+    }
+
+    public HBaseSubScanSpec setRegionServer(String regionServer) {
+      this.regionServer = regionServer;
+      return this;
+    }
+
     public byte[] getStartRow() {
       return startRow;
     }
@@ -204,7 +216,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
           + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
           + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
           + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString())
-          + "]";
+          + ", regionServer=" + regionServer + "]";
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 96f0c4a..dbeced3 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
+import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
 import org.apache.drill.exec.util.VectorUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -43,6 +44,10 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   protected static Configuration conf = HBaseConfiguration.create();
 
+  protected static HBaseStoragePlugin storagePlugin;
+
+  protected static HBaseStoragePluginConfig storagePluginConfig;
+
   @Rule public TestName TEST_NAME = new TestName();
 
   private int[] columnWidths = new int[] { 8 };
@@ -58,11 +63,13 @@ public class BaseHBaseTest extends BaseTestQuery {
      * Change the following to HBaseTestsSuite.configure(false, true)
      * if you want to test against an externally running HBase cluster.
      */
-    HBaseTestsSuite.configure(false, true);
-
+    HBaseTestsSuite.configure(true, true);
     HBaseTestsSuite.initCluster();
-    HBaseStoragePlugin plugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
-    plugin.getConfig().setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
+
+    storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
+    storagePluginConfig = storagePlugin.getConfig();
+
+    storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
   }
 
   @AfterClass
@@ -77,7 +84,7 @@ public class BaseHBaseTest extends BaseTestQuery {
   protected void setColumnWidths(int[] columnWidths) {
     this.columnWidths = columnWidths;
   }
-  
+
   protected String getPlanText(String planFile, String tableName) throws IOException {
     return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
         .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort())

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index e30f79e..67e6f87 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -36,7 +36,9 @@ import org.junit.runners.Suite.SuiteClasses;
 @SuiteClasses({
   HBaseRecordReaderTest.class,
   TestHBaseFilterPushDown.class,
-  TestHBaseProjectPushDown.class})
+  TestHBaseProjectPushDown.class,
+  TestHBaseRegionScanAssignments.class
+})
 public class HBaseTestsSuite {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
 
@@ -147,4 +149,8 @@ public class HBaseTestsSuite {
     HBaseTestsSuite.createTables = createTables;
   }
 
+  public static HBaseAdmin getAdmin() {
+    return admin;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
new file mode 100644
index 0000000..71cb604
--- /dev/null
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.hbase.HBaseGroupScan;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestHBaseRegionScanAssignments extends BaseHBaseTest {
+  static final String HOST_A = "A";
+  static final String HOST_B = "B";
+  static final String HOST_C = "C";
+  static final String HOST_D = "D";
+  static final String HOST_E = "E";
+  static final String HOST_F = "F";
+  static final String HOST_G = "G";
+  static final String HOST_H = "H";
+  static final String HOST_I = "I";
+  static final String HOST_J = "J";
+  static final String HOST_K = "K";
+  static final String HOST_L = "L";
+  static final String HOST_M = "M";
+
+  static final String HOST_X = "X";
+
+  static final String PORT_AND_STARTTIME = ",60020,1400265190186";
+
+  static final ServerName SERVER_A = new ServerName(HOST_A + PORT_AND_STARTTIME);
+  static final ServerName SERVER_B = new ServerName(HOST_B + PORT_AND_STARTTIME);
+  static final ServerName SERVER_C = new ServerName(HOST_C + PORT_AND_STARTTIME);
+  static final ServerName SERVER_D = new ServerName(HOST_D + PORT_AND_STARTTIME);
+  static final ServerName SERVER_E = new ServerName(HOST_E + PORT_AND_STARTTIME);
+  static final ServerName SERVER_F = new ServerName(HOST_F + PORT_AND_STARTTIME);
+  static final ServerName SERVER_G = new ServerName(HOST_G + PORT_AND_STARTTIME);
+  static final ServerName SERVER_H = new ServerName(HOST_H + PORT_AND_STARTTIME);
+  static final ServerName SERVER_I = new ServerName(HOST_I + PORT_AND_STARTTIME);
+
+  static final ServerName SERVER_X = new ServerName(HOST_X + PORT_AND_STARTTIME);
+
+  static final byte[][] splits = {{},
+    "10".getBytes(), "15".getBytes(), "20".getBytes(), "25".getBytes(), "30".getBytes(), "35".getBytes(),
+    "40".getBytes(), "45".getBytes(), "50".getBytes(), "55".getBytes(), "60".getBytes(), "65".getBytes(),
+    "70".getBytes(), "75".getBytes(), "80".getBytes(), "85".getBytes(), "90".getBytes(), "95".getBytes()};
+
+  static final String TABLE_NAME = "TestTable";
+  static final byte[] TABLE_NAME_BYTES = TABLE_NAME.getBytes();
+
+  /**
+   * Has the same name as the {@link BeforeClass} method of the parent so that
+   * we do not start MiniHBase cluster as it is not required for these tests.
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // do nothing
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentMix() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+    endpoints.add(DB_A);
+    endpoints.add(DB_A);
+    final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+    endpoints.add(DB_B);
+    final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+    endpoints.add(DB_D);
+    final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_X).setControlPort(1234).build();
+    endpoints.add(DB_X);
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'X'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentSomeAfinedWithOrphans() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[9]), SERVER_E);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[9], splits[10]), SERVER_E);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[10], splits[11]), SERVER_F);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[11], splits[12]), SERVER_F);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[12], splits[13]), SERVER_G);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[13], splits[14]), SERVER_G);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[14], splits[15]), SERVER_H);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[15], splits[16]), SERVER_H);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[16], splits[17]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[17], splits[0]), SERVER_A);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_I).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_J).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_K).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_L).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_M).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    LinkedList<Integer> sizes = Lists.newLinkedList();
+    sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1);
+    sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2);
+    for (int i = 0; i < endpoints.size(); i++) {
+      assertTrue(sizes.remove((Integer)scan.getSpecificScan(i).getRegionScanSpecList().size()));
+    }
+    assertEquals(0, sizes.size());
+    testParallelizationWidth(scan, endpoints.size());
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentOneEach() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[0]), SERVER_A);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentNoAfinity() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_X);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_X);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+    endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G'
+    assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H'
+    testParallelizationWidth(scan, i);
+  }
+
+  @Test
+  public void testHBaseGroupScanAssignmentAllPreferred() throws Exception {
+    NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D);
+    regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D);
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+    endpoints.add(DB_A);
+    final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+    endpoints.add(DB_B);
+    final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build();
+    endpoints.add(DB_D);
+    final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+    endpoints.add(DB_X);
+
+    HBaseGroupScan scan = new HBaseGroupScan();
+    scan.setRegionsToScan(regionsToScan);
+    scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+    scan.applyAssignments(endpoints);
+
+    int i = 0;
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+    assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+    testParallelizationWidth(scan, i);
+  }
+
+  private void testParallelizationWidth(HBaseGroupScan scan, int i) {
+    try {
+      scan.getSpecificScan(i);
+      fail("Should not have " + i + "th assignment or you have not enabled Java assertion.");
+    } catch (AssertionError e) { }
+  }
+}


[4/6] git commit: DRILL-818: Fix output type while adding date and interval types.

Posted by ja...@apache.org.
DRILL-818: Fix output type while adding date and interval types.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/292765e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/292765e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/292765e7

Branch: refs/heads/master
Commit: 292765e7211d6ddddb64e67bdcff64829a60a615
Parents: d52e325
Author: Mehant Baid <me...@gmail.com>
Authored: Sun Jun 1 15:29:32 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:13:07 2014 -0700

----------------------------------------------------------------------
 .../DateIntervalArithmeticFunctions.java             |  4 ++++
 .../drill/exec/record/vector/TestDateTypes.java      |  2 +-
 .../org/apache/drill/jdbc/test/TestJdbcQuery.java    | 15 +++++++++++++--
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
index 9103a86..4fbfdf9 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
@@ -88,7 +88,11 @@ public class ${datetype}${intervaltype}Functions {
     @Param ${datetype}Holder left;
     @Param ${intervaltype}Holder right;
     @Workspace org.joda.time.MutableDateTime temp;
+    <#if datetype == "Date" && (intervaltype.startsWith("Interval"))>
+    @Output TimeStampHolder out;
+    <#else>
     @Output ${datetype}Holder out;
+    </#if>
 
         public void setup(RecordBatch incoming) {
             <#if datetype == "TimeStampTZ">

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
index adae024..f4fd7ae 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
@@ -293,7 +293,7 @@ public class TestDateTypes extends PopUnitTestBase {
 
                 ValueVector.Accessor accessor = v.getValueVector().getAccessor();
 
-                assertEquals((accessor.getObject(0).toString()), ("2008-03-27"));
+                assertEquals((accessor.getObject(0).toString()), ("2008-03-27 00:00:00.000"));
 
 
             }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index e684228..088191c 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -44,6 +44,7 @@ import org.junit.rules.TestRule;
 import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -764,7 +765,9 @@ import static org.junit.Assert.fail;
 
           // show tables on view
           ResultSet resultSet = statement.executeQuery("select date '2008-2-23', time '12:23:34', timestamp '2008-2-23 12:23:34.456', " +
-                                                       "interval '1' year, interval '2' day " +
+                                                       "interval '1' year, interval '2' day, " +
+                                                       "date_add(date '2008-2-23', interval '1 10:20:30' day to second), " +
+                                                       "date_add(date '2010-2-23', 1) " +
                                                        "from cp.`employee.json` limit 1");
 
           java.sql.Date date = resultSet.getDate(1);
@@ -772,9 +775,17 @@ import static org.junit.Assert.fail;
           java.sql.Timestamp ts = resultSet.getTimestamp(3);
           String intervalYear = resultSet.getString(4);
           String intervalDay  = resultSet.getString(5);
+          java.sql.Timestamp ts1 = resultSet.getTimestamp(6);
+          java.sql.Date date1 = resultSet.getDate(7);
+
+          java.sql.Timestamp result = java.sql.Timestamp.valueOf("2008-2-24 10:20:30");
+          java.sql.Date result1 = java.sql.Date.valueOf("2010-2-24");
+          assertEquals(ts1, result);
+          assertEquals(date1, result1);
 
           System.out.println("Date: " + date.toString() + " time: " + time.toString() + " timestamp: " + ts.toString() +
-                             "\ninterval year: " + intervalYear + " intervalDay: " + intervalDay);
+                             "\ninterval year: " + intervalYear + " intervalDay: " + intervalDay +
+                             " date_interval_add: " + ts1.toString() + "date_int_add: " + date1.toString());
 
           statement.close();
           return null;


[2/6] git commit: DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.

Posted by ja...@apache.org.
DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/760cbd42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/760cbd42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/760cbd42

Branch: refs/heads/master
Commit: 760cbd421c131ed43f5011764c7e244b661bd84b
Parents: 623a52e
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed May 28 17:32:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:09 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ScreenCreator.java |  4 +-
 .../BroadcastSenderRootExec.java                | 51 +++++++++++++++-----
 .../exec/planner/physical/PlannerSettings.java  | 11 +++--
 .../apache/drill/exec/record/WritableBatch.java |  6 +++
 .../apache/drill/exec/rpc/data/DataTunnel.java  | 12 ++---
 .../server/options/SystemOptionManager.java     |  1 +
 6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c92633f..9aefbe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -74,6 +74,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     public boolean next() {
       if(!ok){
         stop();
+        context.fail(this.listener.ex);
         return false;
       }
 
@@ -135,7 +136,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     private SendListener listener = new SendListener();
 
     private class SendListener extends BaseRpcOutcomeListener<Ack>{
-
+      volatile RpcException ex; 
 
 
       @Override
@@ -150,6 +151,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         logger.error("Failure while sending data to user.", ex);
         ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
         ok = false;
+        this.ex = ex;
       }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 0a01583..9c55825 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.broadcastsender;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
@@ -26,13 +28,15 @@ import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.work.ErrorHelper;
 
 /**
  * Broadcast Sender broadcasts incoming batches to all receivers (one or more).
@@ -47,7 +51,6 @@ public class BroadcastSenderRootExec implements RootExec {
   private final ExecProtos.FragmentHandle handle;
   private volatile boolean ok;
   private final RecordBatch incoming;
-  private final DrillRpcFuture[] responseFutures;
 
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
@@ -63,12 +66,12 @@ public class BroadcastSenderRootExec implements RootExec {
       FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
       tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
     }
-    responseFutures = new DrillRpcFuture[destinations.size()];
   }
 
   @Override
   public boolean next() {
     if(!ok) {
+      context.fail(statusHandler.ex);
       return false;
     }
 
@@ -79,24 +82,25 @@ public class BroadcastSenderRootExec implements RootExec {
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
-          responseFutures[i] = tunnels[i].sendRecordBatch(context, b2);
+          tunnels[i].sendRecordBatch(this.statusHandler, b2);
+          statusHandler.sendCount.increment();
         }
 
-        waitAllFutures(false);
         return false;
 
       case OK_NEW_SCHEMA:
       case OK:
         WritableBatch writableBatch = incoming.getWritableBatch();
+        if (tunnels.length > 1) {
+          writableBatch.retainBuffers(tunnels.length - 1);  
+        }
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
-          if(i > 0) {
-            writableBatch.retainBuffers();
-          }
-          responseFutures[i] = tunnels[i].sendRecordBatch(context, batch);
+          tunnels[i].sendRecordBatch(this.statusHandler, batch);   
+          statusHandler.sendCount.increment();
         }
 
-        return waitAllFutures(true);
+        return ok;
 
       case NOT_YET:
       default:
@@ -104,6 +108,7 @@ public class BroadcastSenderRootExec implements RootExec {
     }
   }
 
+  /*
   private boolean waitAllFutures(boolean haltOnError) {
     for (DrillRpcFuture<?> responseFuture : responseFutures) {
       try {
@@ -124,10 +129,34 @@ public class BroadcastSenderRootExec implements RootExec {
     }
     return true;
   }
-
+*/
+  
   @Override
   public void stop() {
       ok = false;
+      statusHandler.sendCount.waitForSendComplete();
       incoming.cleanup();
   }
+  
+  private StatusHandler statusHandler = new StatusHandler();
+  private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
+    volatile RpcException ex;
+    private final SendingAccountor sendCount = new SendingAccountor();
+    
+    @Override
+    public void success(Ack value, ByteBuf buffer) {
+      sendCount.decrement();
+      super.success(value, buffer);
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      sendCount.decrement();
+      logger.error("Failure while sending data to user.", ex);
+      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+      ok = false;
+      this.ex = ex;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 18a32af..ad9fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -22,13 +22,15 @@ import net.hydromatic.optiq.tools.FrameworkContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 
 public class PlannerSettings implements FrameworkContext{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
 
   private int numEndPoints = 0;
   private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
-  private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold
+
+  public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE; 
 
   public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
   public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
@@ -36,7 +38,8 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
   public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
-  public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
+  public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
+  public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000);
 
   public OptionManager options = null;
 
@@ -88,8 +91,8 @@ public class PlannerSettings implements FrameworkContext{
     return options.getOption(BROADCAST.getOptionName()).bool_val;
   }
 
-  public int getBroadcastThreshold() {
-    return broadcastThreshold;
+  public long getBroadcastThreshold() {
+    return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4ff3708..14ade39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,4 +163,10 @@ public class WritableBatch {
       buf.retain();
     }
   }
+  
+  public void retainBuffers(int increment) {
+    for (ByteBuf buf : buffers) {
+      buf.retain(increment);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 1dcd89e..98bbeeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -37,22 +37,22 @@ public class DataTunnel {
   }
 
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
-    SendBatch b = new SendBatch(outcomeListener, batch);
+    SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
     manager.runCommand(b);
   }
 
   public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    SendBatchAsync b = new SendBatchAsync(batch, context);
+    SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
     manager.runCommand(b);
     return b.getFuture();
   }
 
 
   
-  public static class SendBatch extends ListeningCommand<Ack, DataClientConnection> {
+  private static class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
     final FragmentWritableBatch batch;
 
-    public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
+    public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
       super(listener);
       this.batch = batch;
     }
@@ -70,11 +70,11 @@ public class DataTunnel {
     
   }
 
-  public static class SendBatchAsync extends FutureBitCommand<Ack, DataClientConnection> {
+  private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
     final FragmentWritableBatch batch;
     final FragmentContext context;
 
-    public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) {
+    public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
       super();
       this.batch = batch;
       this.context = context;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8d9a68f..3e90eb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -41,6 +41,7 @@ public class SystemOptionManager implements OptionManager{
       PlannerSettings.MERGEJOIN, 
       PlannerSettings.MULTIPHASE,
       PlannerSettings.BROADCAST,
+      PlannerSettings.BROADCAST_THRESHOLD,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR
   };


[5/6] git commit: DRILL-855: Improve work assignment parallelization

Posted by ja...@apache.org.
DRILL-855: Improve work assignment parallelization


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d929faac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d929faac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d929faac

Branch: refs/heads/master
Commit: d929faace270302c5e00e272775be98d5a8f83e1
Parents: 292765e
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Wed May 28 04:56:46 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:14:42 2014 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-override.conf  |  1 +
 .../org/apache/drill/exec/ExecConstants.java    |  1 +
 .../drill/exec/physical/EndpointAffinity.java   | 20 ++++--
 .../drill/exec/planner/SimpleExecPlanner.java   |  8 +--
 .../planner/fragment/SimpleParallelizer.java    | 47 ++++++++++----
 .../drill/exec/planner/fragment/Wrapper.java    | 68 +++++++++++---------
 .../apache/drill/exec/work/foreman/Foreman.java | 29 ++-------
 .../src/main/resources/drill-module.conf        | 16 +++++
 .../drill/exec/pop/TestFragmentChecker.java     |  4 +-
 9 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index a9316a9..da3d094 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -87,6 +87,7 @@ drill.exec: {
   work: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
+    affinity.factor: 1.2,
     executor.threads: 4
   },
   trace: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d9e0833..e66e93c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,6 +44,7 @@ public interface ExecConstants {
   public static final String GLOBAL_MAX_WIDTH = "drill.exec.work.global.max.width";
   public static final String MAX_WIDTH_PER_ENDPOINT = "drill.exec.work.max.width.per.endpoint";
   public static final String EXECUTOR_THREADS = "drill.exec.work.executor.threads";
+  public static final String AFFINITY_FACTOR = "drill.exec.work.affinity.factor";
   public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
   public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
   public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index f3059ae..df31f74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -19,18 +19,20 @@ package org.apache.drill.exec.physical;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
+import com.google.protobuf.TextFormat;
+
 
 public class EndpointAffinity implements Comparable<EndpointAffinity>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
-  
+
   private DrillbitEndpoint endpoint;
   private float affinity = 0.0f;
-  
+
   public EndpointAffinity(DrillbitEndpoint endpoint) {
     super();
     this.endpoint = endpoint;
   }
-  
+
   public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
     super();
     this.endpoint = endpoint;
@@ -46,15 +48,19 @@ public class EndpointAffinity implements Comparable<EndpointAffinity>{
   public float getAffinity() {
     return affinity;
   }
-  
+
   @Override
   public int compareTo(EndpointAffinity o) {
     return Float.compare(affinity, o.affinity);
   }
-  
+
   public void addAffinity(float f){
     affinity += f;
   }
-  
-  
+
+  @Override
+  public String toString() {
+    return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
index 4da6500..132fde7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -51,9 +51,9 @@ public class SimpleExecPlanner implements ExecPlanner{
 
     int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
 
-    return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
-            context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint);
-
-
+    parallelizer.setGlobalMaxWidth(maxWidth).setMaxWidthPerEndpoint(maxWidthPerEndpoint);
+    return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(),
+        context.getQueryId(), context.getActiveEndpoints(), context.getPlanReader(), fragmentRoot, planningSet);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 313a81d..d226b08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -51,6 +51,36 @@ public class SimpleParallelizer {
   private final Materializer materializer = new Materializer();
 
   /**
+   * The maximum level or parallelization any stage of the query can do. Note that while this
+   * might be the number of active Drillbits, realistically, this could be well beyond that
+   * number of we want to do things like speed results return.
+   */
+  private int globalMaxWidth;
+  public SimpleParallelizer setGlobalMaxWidth(int globalMaxWidth) {
+    this.globalMaxWidth = globalMaxWidth;
+    return this;
+  }
+
+  /**
+   * Limits the maximum level of parallelization to this factor time the number of Drillbits
+   */
+  private int maxWidthPerEndpoint;
+  public SimpleParallelizer setMaxWidthPerEndpoint(int maxWidthPerEndpoint) {
+    this.maxWidthPerEndpoint = maxWidthPerEndpoint;
+    return this;
+  }
+
+
+  /**
+   * Factor by which a node with endpoint affinity will be favored while creating assignment
+   */
+  private double affinityFactor = 1.2f;
+  public SimpleParallelizer setAffinityFactor(double affinityFactor) {
+    this.affinityFactor = affinityFactor;
+    return this;
+  }
+
+  /**
    * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
    * beyond the global max width.
    *
@@ -60,16 +90,12 @@ public class SimpleParallelizer {
    * @param reader          Tool used to read JSON plans
    * @param rootNode        The root node of the PhysicalPlan that we will parallelizing.
    * @param planningSet     The set of queries with collected statistics that we'll work with.
-   * @param globalMaxWidth  The maximum level or parallelization any stage of the query can do. Note that while this
-   *                        might be the number of active Drillbits, realistically, this could be well beyond that
-   *                        number of we want to do things like speed results return.
-   * @param maxWidthPerEndpoint Limits the maximum level of parallelization to this factor time the number of Drillbits
    * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
    * @throws ExecutionSetupException
    */
-  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
-                                    int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException {
-    assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint);
+  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
+      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException {
+    assignEndpoints(activeEndpoints, planningSet);
     return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet);
   }
 
@@ -152,11 +178,9 @@ public class SimpleParallelizer {
     }
 
     return new QueryWorkUnit(rootOperator, rootFragment, fragments);
-
   }
 
-  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet,
-                               int globalMaxWidth, int maxWidthPerEndpoint) throws PhysicalOperatorSetupException {
+  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet) throws PhysicalOperatorSetupException {
     // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
     // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
     // could be based on endpoint load)
@@ -181,7 +205,8 @@ public class SimpleParallelizer {
 //      logger.debug("Setting width {} on fragment {}", width, wrapper);
       wrapper.setWidth(width);
       // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
-      wrapper.assignEndpoints(allNodes);
+      wrapper.assignEndpoints(allNodes, affinityFactor);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8602bf0..38cba09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.planner.fragment;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -47,7 +51,7 @@ public class Wrapper {
   private int width = -1;
   private final Stats stats;
   private boolean endpointsAssigned;
-  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = Maps.newHashMap();
   private long initialAllocation = 0;
   private long maxAllocation = 0;
 
@@ -71,14 +75,14 @@ public class Wrapper {
       addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
     }
   }
-  
+
   public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
     Preconditions.checkState(!endpointsAssigned);
     Preconditions.checkNotNull(endpoint);
-    EndpointAffinity ea = endpointAffinity.get(endpoint);
+    EndpointAffinity ea = endpointAffinityMap.get(endpoint);
     if (ea == null) {
       ea = new EndpointAffinity(endpoint);
-      endpointAffinity.put(endpoint, ea);
+      endpointAffinityMap.put(endpoint, ea);
     }
 
     ea.addAffinity(affinity);
@@ -116,7 +120,6 @@ public class Wrapper {
 
   private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
 
-    
     @Override
     public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
       if(exchange == node.getSendingExchange()){
@@ -148,40 +151,42 @@ public class Wrapper {
     public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
       return visitChildren(op, value);
     }
-    
+
   }
-  
-  public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws PhysicalOperatorSetupException {
-    Preconditions.checkState(!endpointsAssigned);
 
+  public void assignEndpoints(Collection<DrillbitEndpoint> allEndpoints, double affinityFactor) throws PhysicalOperatorSetupException {
+    Preconditions.checkState(!endpointsAssigned);
     endpointsAssigned = true;
 
-    List<EndpointAffinity> values = Lists.newArrayList();
-    values.addAll(endpointAffinity.values());
-
-    if (values.size() == 0) {
-      List<DrillbitEndpoint> all = Lists.newArrayList(allPossible);
-      final int div = allPossible.size();
-      int start = ThreadLocalRandom.current().nextInt(div);
-      // round robin with random start.
-      for (int i = start; i < start + width; i++) {
-        Preconditions.checkNotNull(all.get(i % div));
-        endpoints.add(all.get(i % div));
-      }
-    } else {
+    if (endpointAffinityMap.size() > 0) {
+      List<EndpointAffinity> affinedEPs = Lists.newArrayList(endpointAffinityMap.values());
       // get nodes with highest affinity.
-      Collections.sort(values);
-      values = Lists.reverse(values);
-      for (int i = 0; i < width; i++) {
-        Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
-        endpoints.add(values.get(i%values.size()).getEndpoint());
+      Collections.sort(affinedEPs);
+      Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(Lists.reverse(affinedEPs));
+      /** Maximum number of slots which should go to endpoints with affinity */
+      int affinedSlots = Math.min((Math.max(1, (int) (affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width);
+      while(endpoints.size() < affinedSlots) {
+        EndpointAffinity ea = affinedEPItr.next();
+        DrillbitEndpoint endpoint = ea.getEndpoint();
+        endpoints.add(endpoint);
+      }
+    }
+    // add other endpoints if required
+    if (endpoints.size() < width) {
+      List<DrillbitEndpoint> all = Lists.newArrayList(allEndpoints);
+      all.removeAll(endpointAffinityMap.keySet());
+      // round robin with random start.
+      Collections.shuffle(all, ThreadLocalRandom.current());
+      Iterator<DrillbitEndpoint> otherEPItr = Iterators.cycle(all.size() > 0 ? all : endpointAffinityMap.keySet());
+      while (endpoints.size() < width) {
+        endpoints.add(otherEPItr.next());
       }
     }
 
     // Set scan and store endpoints.
     AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
     node.getRoot().accept(visitor, endpoints);
-    
+
     // Set the endpoints for this (one at most) sending exchange.
     if (node.getSendingExchange() != null) {
       node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
@@ -202,4 +207,5 @@ public class Wrapper {
     Preconditions.checkState(endpointsAssigned);
     return this.endpoints.get(minorFragmentId);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index eb1d738..b8edb84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -121,7 +121,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     cleanupAndSendResult(result);
   }
 
-
   public void cancel() {
     if(isFinished()){
       return;
@@ -148,8 +147,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
-
-
   /**
    * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
    */
@@ -160,10 +157,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     // convert a run query request into action
     try{
       switch (queryRequest.getType()) {
-
       case LOGICAL:
         parseAndRunLogicalPlan(queryRequest.getPlan());
-
         break;
       case PHYSICAL:
         parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -185,7 +180,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
-
   private void parseAndRunLogicalPlan(String json) {
 
     try {
@@ -249,8 +243,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   }
 
-
-
   private void parseAndRunPhysicalPlan(String json) {
     try {
       PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -260,7 +252,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
-
   private void runPhysicalPlan(PhysicalPlan plan) {
 
     if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -278,12 +269,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
 
     PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
-    SimpleParallelizer parallelizer = new SimpleParallelizer();
+    SimpleParallelizer parallelizer = new SimpleParallelizer()
+      .setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH))
+      .setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT))
+      .setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR));
 
     try {
-      QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
-              context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
-              context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
+      QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(),
+          queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet);
 
       this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();
@@ -292,17 +285,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       // store fragments in distributed grid.
       logger.debug("Storing fragments");
       for (PlanFragment f : work.getFragments()) {
-
         // store all fragments in grid since they are part of handshake.
-
         context.getCache().storeFragment(f);
         if (f.getLeafFragment()) {
           leafFragments.add(f);
         } else {
           intermediateFragments.add(f);
         }
-
-
       }
 
       logger.debug("Fragments stored.");
@@ -311,7 +300,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
       logger.debug("Fragments running.");
 
-
     } catch (ExecutionSetupException | RpcException e) {
       fail("Failure while setting up query.", e);
     }
@@ -352,7 +340,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     return this.state.getState();
   }
 
-
   class ForemanManagerListener{
     void fail(String message, Throwable t) {
       ForemanManagerListener.this.fail(message, t);
@@ -364,13 +351,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   }
 
-
-
   @Override
   public int compareTo(Object o) {
     return o.hashCode() - o.hashCode();
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 26205bd..f8396bb 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 //  This file tells Drill to consider this module when class path scanning.  
 //  This file can also include any supplementary configuration information.  
 //  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
@@ -76,6 +91,7 @@ drill.exec: {
   work: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
+    affinity.factor: 1.2,
     executor.threads: 4
   },
   trace: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 1b38dce..ea19351 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -59,8 +59,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
-
-    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+    par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5);
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
 
     System.out.print(qwu.getRootFragment().getFragmentJson());