You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/02 18:51:22 UTC
[1/6] git commit: DRILL-864: MergeJoinBatch fails to set record count
in ValueVectors in container
Repository: incubator-drill
Updated Branches:
refs/heads/master e1e5ea0ed -> 8490d7433
DRILL-864: MergeJoinBatch fails to set record count in ValueVectors in container
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/623a52e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/623a52e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/623a52e1
Branch: refs/heads/master
Commit: 623a52e11e94cdfeac1bb734890ec7155cc760d5
Parents: e1e5ea0
Author: vkorukanti <ve...@gmail.com>
Authored: Thu May 29 17:23:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:11:39 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/join/MergeJoinBatch.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/623a52e1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 121cfec..b284454 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
import java.io.IOException;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -181,18 +182,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
case BATCH_RETURNED:
// only return new schema if new worker has been setup.
logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
+ setRecordCountInContainer();
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
case FAILURE:
kill();
return IterOutcome.STOP;
case NO_MORE_DATA:
logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+ setRecordCountInContainer();
return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
case SCHEMA_CHANGED:
worker = null;
if(status.getOutPosition() > 0){
// if we have current data, let's return that.
logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
+ setRecordCountInContainer();
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
}else{
// loop again to rebuild worker.
@@ -210,6 +214,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
}
+ private void setRecordCountInContainer() {
+ for(VectorWrapper vw : container){
+ Preconditions.checkArgument(!vw.isHyper());
+ vw.getValueVector().getMutator().setValueCount(getRecordCount());
+ }
+ }
+
public void resetBatchBuilder() {
batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
}
[3/6] git commit: DRILL-874 : Fix code to handle splitAndTransferTo
in BitVector, when copy is required.
Posted by ja...@apache.org.
DRILL-874 : Fix code to handle splitAndTransferTo in BitVector, when copy is required.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d52e3251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d52e3251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d52e3251
Branch: refs/heads/master
Commit: d52e3251ba30edb7a5ab2fcc0e876fa8e1125bfd
Parents: 760cbd4
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Sun Jun 1 11:06:29 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:39 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/vector/BitVector.java | 29 +++++++++++---------
1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d52e3251/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 14324b0..e217ddb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -60,11 +60,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
private int getSizeFromCount(int valueCount) {
- return (int) Math.ceil((float)valueCount / 8.0);
+ return (int) Math.ceil(valueCount / 8.0);
}
private int getByteIndex(int index) {
- return (int) Math.floor((float) index / 8.0);
+ return (int) Math.floor(index / 8.0);
}
public void allocateNew() {
@@ -174,27 +174,30 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
public void splitAndTransferTo(int startIndex, int length, BitVector target) {
assert startIndex + length <= valueCount;
int firstByte = getByteIndex(startIndex);
- int lastByte = getSizeFromCount(startIndex + length) - 1;
+ int byteSize = getSizeFromCount(length);
int offset = startIndex % 8;
if (offset == 0) {
// slice
- target.data = this.data.slice(firstByte, lastByte - firstByte + 1);
+ target.data = this.data.slice(firstByte, byteSize);
target.data.retain();
} else {
// Copy data
+ // When the first bit starts from the middle of a byte (offset != 0), copy data from src BitVector.
+ // Each byte in the target is composed by a part in i-th byte, another part in (i+1)-th byte.
+ // The last byte copied to target is a bit tricky :
+ // 1) if length requires partly byte ( length % 8 !=0), copy the remaining bits only.
+ // 2) otherwise, copy the last byte in the same way as to the prior bytes.
target.clear();
target.allocateNew(length);
- if ((startIndex + length) % 8 == 0) {
- lastByte++;
- }
- int i = firstByte;
// TODO maybe do this one word at a time, rather than byte?
- for (; i <= lastByte - 1; i++) {
- target.data.setByte(i - firstByte, (((this.data.getByte(i) & 0xFF) >>> offset) + (this.data.getByte(i + 1) << (8 - offset))));
- }
- if (startIndex + length == this.valueCount) {
- target.data.setByte(i - firstByte, ((this.data.getByte(lastByte) & 0xFF) >>> offset));
+ for (int i = 0; i < byteSize - 1; i++) {
+ target.data.setByte(i, (((this.data.getByte(firstByte + i) & 0xFF) >>> offset) + (this.data.getByte(firstByte + i + 1) << (8 - offset))));
}
+ if (length % 8 != 0)
+ target.data.setByte(byteSize - 1, ((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset));
+ else
+ target.data.setByte(byteSize - 1,
+ (((this.data.getByte(firstByte + byteSize - 1) & 0xFF) >>> offset) + (this.data.getByte(firstByte + byteSize) << (8 - offset))));
}
}
[6/6] git commit: DRILL-672: Queries against HBase table do not close
after the data is returned.
Posted by ja...@apache.org.
DRILL-672: Queries against HBase table do not close after the data is returned.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8490d743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8490d743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8490d743
Branch: refs/heads/master
Commit: 8490d7433f9d9171971ae6e1af02cb67215cd8ce
Parents: d929faa
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Wed May 28 04:57:07 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:15:29 2014 -0700
----------------------------------------------------------------------
.../drill/exec/store/hbase/HBaseGroupScan.java | 215 ++++++++++---
.../exec/store/hbase/HBaseRecordReader.java | 4 +-
.../drill/exec/store/hbase/HBaseSubScan.java | 14 +-
.../org/apache/drill/hbase/BaseHBaseTest.java | 17 +-
.../org/apache/drill/hbase/HBaseTestsSuite.java | 8 +-
.../hbase/TestHBaseRegionScanAssignments.java | 299 +++++++++++++++++++
6 files changed, 505 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 809aa86..f3ff64c 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -18,12 +18,18 @@
package org.apache.drill.exec.store.hbase;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -38,12 +44,12 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
@@ -51,16 +57,26 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
@JsonTypeName("hbase-scan")
public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+ private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
+ @Override
+ public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
+ return list1.size() - list2.size();
+ }
+ };
+
+ private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
private HBaseStoragePluginConfig storagePluginConfig;
private List<SchemaPath> columns;
@@ -70,9 +86,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
private HBaseStoragePlugin storagePlugin;
private Stopwatch watch = new Stopwatch();
- private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings;
- private List<EndpointAffinity> endpointAffinities;
- private NavigableMap<HRegionInfo,ServerName> regionsToScan;
+
+ private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
+
+ private NavigableMap<HRegionInfo, ServerName> regionsToScan;
+
private HTableDescriptor hTableDesc;
private boolean filterPushedDown = false;
@@ -85,9 +103,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
}
- public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
- this.storagePlugin = storageEngine;
- this.storagePluginConfig = storageEngine.getConfig();
+ public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+ this.storagePlugin = storagePlugin;
+ this.storagePluginConfig = storagePlugin.getConfig();
this.hbaseScanSpec = scanSpec;
this.columns = columns;
init();
@@ -95,13 +113,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
/**
* Private constructor, used for cloning.
- * @param that The
+ * @param that The HBaseGroupScan to clone
*/
private HBaseGroupScan(HBaseGroupScan that) {
this.columns = that.columns;
- this.endpointAffinities = that.endpointAffinities;
this.hbaseScanSpec = that.hbaseScanSpec;
- this.mappings = that.mappings;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
this.regionsToScan = that.regionsToScan;
this.storagePlugin = that.storagePlugin;
this.storagePluginConfig = that.storagePluginConfig;
@@ -165,8 +182,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
for (ServerName sn : regionsToScan.values()) {
- String host = sn.getHostname();
- DrillbitEndpoint ep = endpointMap.get(host);
+ DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
if (ep != null) {
EndpointAffinity affinity = affinityMap.get(ep);
if (affinity == null) {
@@ -176,9 +192,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
}
}
}
- this.endpointAffinities = Lists.newArrayList(affinityMap.values());
- logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
- return this.endpointAffinities;
+ logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
+ return Lists.newArrayList(affinityMap.values());
}
/**
@@ -189,42 +204,135 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
watch.reset();
watch.start();
- Preconditions.checkArgument(incomingEndpoints.size() <= regionsToScan.size(),
- String.format("Incoming endpoints %d is greater than number of row groups %d", incomingEndpoints.size(), regionsToScan.size()));
- mappings = ArrayListMultimap.create();
- ArrayListMultimap<String, Integer> incomingEndpointMap = ArrayListMultimap.create();
- for (int i = 0; i < incomingEndpoints.size(); i++) {
- incomingEndpointMap.put(incomingEndpoints.get(i).getAddress(), i);
+ final int numSlots = incomingEndpoints.size();
+ Preconditions.checkArgument(numSlots <= regionsToScan.size(),
+ String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
+
+ /*
+ * Minimum/Maximum number of assignment per slot
+ */
+ final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots);
+ final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots);
+
+ /*
+ * initialize (endpoint index => HBaseSubScanSpec list) map
+ */
+ endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+
+ /*
+ * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
+ */
+ Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+ /*
+ * Initialize these two maps
+ */
+ for (int i = 0; i < numSlots; ++i) {
+ endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+ String hostname = incomingEndpoints.get(i).getAddress();
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+ if (hostIndexQueue == null) {
+ hostIndexQueue = Lists.newLinkedList();
+ endpointHostIndexListMap.put(hostname, hostIndexQueue);
+ }
+ hostIndexQueue.add(i);
}
- Map<String, Iterator<Integer>> mapIterator = new HashMap<String, Iterator<Integer>>();
- for (String s : incomingEndpointMap.keySet()) {
- Iterator<Integer> ints = Iterators.cycle(incomingEndpointMap.get(s));
- mapIterator.put(s, ints);
+
+ Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+
+ /*
+ * First, we assign regions which are hosted on region servers running on drillbit endpoints
+ */
+ for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+ Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next();
+ /*
+ * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
+ */
+ Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+ if (endpointIndexlist != null) {
+ Integer slotIndex = endpointIndexlist.poll();
+ List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+ endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+ // add to the tail of the slot list, to add more later in round robin fashion
+ endpointIndexlist.offer(slotIndex);
+ // this region has been assigned
+ regionsIterator.remove();
+ }
}
- Iterator<Integer> nullIterator = Iterators.cycle(incomingEndpointMap.values());
- for (HRegionInfo regionInfo : regionsToScan.keySet()) {
- logger.debug("creating read entry. start key: {} end key: {}", Bytes.toStringBinary(regionInfo.getStartKey()), Bytes.toStringBinary(regionInfo.getEndKey()));
- HBaseSubScan.HBaseSubScanSpec p = new HBaseSubScan.HBaseSubScanSpec()
- .setTableName(hbaseScanSpec.getTableName())
- .setStartRow((hbaseScanSpec.getStartRow() != null && regionInfo.containsRow(hbaseScanSpec.getStartRow())) ? hbaseScanSpec.getStartRow() : regionInfo.getStartKey())
- .setStopRow((hbaseScanSpec.getStopRow() != null && regionInfo.containsRow(hbaseScanSpec.getStopRow())) ? hbaseScanSpec.getStopRow() : regionInfo.getEndKey())
- .setSerializedFilter(hbaseScanSpec.getSerializedFilter());
- String host = regionsToScan.get(regionInfo).getHostname();
- Iterator<Integer> indexIterator = mapIterator.get(host);
- if (indexIterator == null) {
- indexIterator = nullIterator;
+
+ /*
+ * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
+ */
+ PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+ for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ if (listOfScan.size() < minPerEndpointSlot) {
+ minHeap.offer(listOfScan);
+ } else if (listOfScan.size() > minPerEndpointSlot){
+ maxHeap.offer(listOfScan);
}
- mappings.put(indexIterator.next(), p);
}
+
+ /*
+ * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
+ */
+ if (regionsToAssignSet.size() > 0) {
+ for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) {
+ List<HBaseSubScanSpec> smallestList = minHeap.poll();
+ smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+ if (smallestList.size() < minPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+ }
+
+ /*
+ * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
+ */
+ while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
+ List<HBaseSubScanSpec> smallestList = minHeap.poll();
+ List<HBaseSubScanSpec> largestList = maxHeap.poll();
+ smallestList.add(largestList.remove(largestList.size()-1));
+ if (largestList.size() > minPerEndpointSlot) {
+ maxHeap.offer(largestList);
+ }
+ if (smallestList.size() < minPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+
+ /* no slot should be empty at this point */
+ assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
+ "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
+ incomingEndpoints, endpointFragmentMapping.toString());
+
+ logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
+ watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+ }
+
+ private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
+ HBaseScanSpec spec = hbaseScanSpec;
+ return new HBaseSubScanSpec()
+ .setTableName(spec.getTableName())
+ .setRegionServer(regionsToScan.get(ri).getHostname())
+ .setStartRow((!isNullOrEmpty(spec.getStartRow()) && ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey())
+ .setStopRow((!isNullOrEmpty(spec.getStopRow()) && ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey())
+ .setSerializedFilter(spec.getSerializedFilter());
+ }
+
+ private boolean isNullOrEmpty(byte[] key) {
+ return key == null || key.length == 0;
}
@Override
public HBaseSubScan getSpecificScan(int minorFragmentId) {
- return new HBaseSubScan(storagePlugin, storagePluginConfig, mappings.get(minorFragmentId), columns);
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+ minorFragmentId);
+ return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
}
-
@Override
public int getMaxParallelizationWidth() {
return regionsToScan.size();
@@ -313,4 +421,27 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
return filterPushedDown;
}
+ /**
+ * Empty constructor, do not use, only for testing.
+ */
+ @VisibleForTesting
+ public HBaseGroupScan() { }
+
+ /**
+ * Do not use, only for testing.
+ */
+ @VisibleForTesting
+ public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) {
+ this.hbaseScanSpec = hbaseScanSpec;
+ }
+
+ /**
+ * Do not use, only for testing.
+ */
+ @JsonIgnore
+ @VisibleForTesting
+ public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> regionsToScan) {
+ this.regionsToScan = regionsToScan;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index caee8ed..204d486 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -30,8 +30,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -58,7 +56,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
private static final int TARGET_RECORD_COUNT = 4000;
-
+
private LinkedHashSet<SchemaPath> columns;
private OutputMutator outputMutator;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index d9f2b7c..fafe9c9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -125,12 +125,14 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
public static class HBaseSubScanSpec {
protected String tableName;
+ protected String regionServer;
protected byte[] startRow;
protected byte[] stopRow;
protected byte[] serializedFilter;
@parquet.org.codehaus.jackson.annotate.JsonCreator
public HBaseSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("regionServer") String regionServer,
@JsonProperty("startRow") byte[] startRow,
@JsonProperty("stopRow") byte[] stopRow,
@JsonProperty("serializedFilter") byte[] serializedFilter,
@@ -139,6 +141,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
}
this.tableName = tableName;
+ this.regionServer = regionServer;
this.startRow = startRow;
this.stopRow = stopRow;
if (serializedFilter != null) {
@@ -170,6 +173,15 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
return this;
}
+ public String getRegionServer() {
+ return regionServer;
+ }
+
+ public HBaseSubScanSpec setRegionServer(String regionServer) {
+ this.regionServer = regionServer;
+ return this;
+ }
+
public byte[] getStartRow() {
return startRow;
}
@@ -204,7 +216,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
+ ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
+ ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
+ ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString())
- + "]";
+ + ", regionServer=" + regionServer + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 96f0c4a..dbeced3 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
+import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
import org.apache.drill.exec.util.VectorUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -43,6 +44,10 @@ public class BaseHBaseTest extends BaseTestQuery {
protected static Configuration conf = HBaseConfiguration.create();
+ protected static HBaseStoragePlugin storagePlugin;
+
+ protected static HBaseStoragePluginConfig storagePluginConfig;
+
@Rule public TestName TEST_NAME = new TestName();
private int[] columnWidths = new int[] { 8 };
@@ -58,11 +63,13 @@ public class BaseHBaseTest extends BaseTestQuery {
* Change the following to HBaseTestsSuite.configure(false, true)
* if you want to test against an externally running HBase cluster.
*/
- HBaseTestsSuite.configure(false, true);
-
+ HBaseTestsSuite.configure(true, true);
HBaseTestsSuite.initCluster();
- HBaseStoragePlugin plugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
- plugin.getConfig().setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
+
+ storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
+ storagePluginConfig = storagePlugin.getConfig();
+
+ storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
}
@AfterClass
@@ -77,7 +84,7 @@ public class BaseHBaseTest extends BaseTestQuery {
protected void setColumnWidths(int[] columnWidths) {
this.columnWidths = columnWidths;
}
-
+
protected String getPlanText(String planFile, String tableName) throws IOException {
return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
.replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort())
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index e30f79e..67e6f87 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -36,7 +36,9 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({
HBaseRecordReaderTest.class,
TestHBaseFilterPushDown.class,
- TestHBaseProjectPushDown.class})
+ TestHBaseProjectPushDown.class,
+ TestHBaseRegionScanAssignments.class
+})
public class HBaseTestsSuite {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
@@ -147,4 +149,8 @@ public class HBaseTestsSuite {
HBaseTestsSuite.createTables = createTables;
}
+ public static HBaseAdmin getAdmin() {
+ return admin;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8490d743/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
new file mode 100644
index 0000000..71cb604
--- /dev/null
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.hbase.HBaseGroupScan;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestHBaseRegionScanAssignments extends BaseHBaseTest {
+ static final String HOST_A = "A";
+ static final String HOST_B = "B";
+ static final String HOST_C = "C";
+ static final String HOST_D = "D";
+ static final String HOST_E = "E";
+ static final String HOST_F = "F";
+ static final String HOST_G = "G";
+ static final String HOST_H = "H";
+ static final String HOST_I = "I";
+ static final String HOST_J = "J";
+ static final String HOST_K = "K";
+ static final String HOST_L = "L";
+ static final String HOST_M = "M";
+
+ static final String HOST_X = "X";
+
+ static final String PORT_AND_STARTTIME = ",60020,1400265190186";
+
+ static final ServerName SERVER_A = new ServerName(HOST_A + PORT_AND_STARTTIME);
+ static final ServerName SERVER_B = new ServerName(HOST_B + PORT_AND_STARTTIME);
+ static final ServerName SERVER_C = new ServerName(HOST_C + PORT_AND_STARTTIME);
+ static final ServerName SERVER_D = new ServerName(HOST_D + PORT_AND_STARTTIME);
+ static final ServerName SERVER_E = new ServerName(HOST_E + PORT_AND_STARTTIME);
+ static final ServerName SERVER_F = new ServerName(HOST_F + PORT_AND_STARTTIME);
+ static final ServerName SERVER_G = new ServerName(HOST_G + PORT_AND_STARTTIME);
+ static final ServerName SERVER_H = new ServerName(HOST_H + PORT_AND_STARTTIME);
+ static final ServerName SERVER_I = new ServerName(HOST_I + PORT_AND_STARTTIME);
+
+ static final ServerName SERVER_X = new ServerName(HOST_X + PORT_AND_STARTTIME);
+
+ static final byte[][] splits = {{},
+ "10".getBytes(), "15".getBytes(), "20".getBytes(), "25".getBytes(), "30".getBytes(), "35".getBytes(),
+ "40".getBytes(), "45".getBytes(), "50".getBytes(), "55".getBytes(), "60".getBytes(), "65".getBytes(),
+ "70".getBytes(), "75".getBytes(), "80".getBytes(), "85".getBytes(), "90".getBytes(), "95".getBytes()};
+
+ static final String TABLE_NAME = "TestTable";
+ static final byte[] TABLE_NAME_BYTES = TABLE_NAME.getBytes();
+
+ /**
+ * Has the same name as the {@link BeforeClass} method of the parent so that
+ * we do not start MiniHBase cluster as it is not required for these tests.
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // do nothing
+ }
+
+ @Test
+ public void testHBaseGroupScanAssignmentMix() throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_C);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D);
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+ endpoints.add(DB_A);
+ endpoints.add(DB_A);
+ final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+ endpoints.add(DB_B);
+ final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+ endpoints.add(DB_D);
+ final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_X).setControlPort(1234).build();
+ endpoints.add(DB_X);
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ int i = 0;
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'X'
+ testParallelizationWidth(scan, i);
+ }
+
+ @Test
+ public void testHBaseGroupScanAssignmentSomeAfinedWithOrphans() throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[9]), SERVER_E);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[9], splits[10]), SERVER_E);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[10], splits[11]), SERVER_F);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[11], splits[12]), SERVER_F);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[12], splits[13]), SERVER_G);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[13], splits[14]), SERVER_G);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[14], splits[15]), SERVER_H);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[15], splits[16]), SERVER_H);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[16], splits[17]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[17], splits[0]), SERVER_A);
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_I).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_J).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_K).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_L).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_M).setControlPort(1234).build());
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ LinkedList<Integer> sizes = Lists.newLinkedList();
+ sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1); sizes.add(1);
+ sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2); sizes.add(2);
+ for (int i = 0; i < endpoints.size(); i++) {
+ assertTrue(sizes.remove((Integer)scan.getSpecificScan(i).getRegionScanSpecList().size()));
+ }
+ assertEquals(0, sizes.size());
+ testParallelizationWidth(scan, endpoints.size());
+ }
+
+ @Test
+ public void testHBaseGroupScanAssignmentOneEach() throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[8]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[8], splits[0]), SERVER_A);
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ int i = 0;
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H'
+ testParallelizationWidth(scan, i);
+ }
+
+ @Test
+ public void testHBaseGroupScanAssignmentNoAfinity() throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_X);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_X);
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_E).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_F).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_G).setControlPort(1234).build());
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_H).setControlPort(1234).build());
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ int i = 0;
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'E'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'F'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'G'
+ assertEquals(1, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'H'
+ testParallelizationWidth(scan, i);
+ }
+
+ @Test
+ public void testHBaseGroupScanAssignmentAllPreferred() throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[0], splits[1]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[1], splits[2]), SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[2], splits[3]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[3], splits[4]), SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[4], splits[5]), SERVER_C);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[5], splits[6]), SERVER_C);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[6], splits[7]), SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME_BYTES, splits[7], splits[0]), SERVER_D);
+
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ final DrillbitEndpoint DB_A = DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build();
+ endpoints.add(DB_A);
+ final DrillbitEndpoint DB_B = DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build();
+ endpoints.add(DB_B);
+ final DrillbitEndpoint DB_D = DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build();
+ endpoints.add(DB_D);
+ final DrillbitEndpoint DB_X = DrillbitEndpoint.newBuilder().setAddress(HOST_D).setControlPort(1234).build();
+ endpoints.add(DB_X);
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME, splits[0], splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ int i = 0;
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'D'
+ testParallelizationWidth(scan, i);
+ }
+
+ private void testParallelizationWidth(HBaseGroupScan scan, int i) {
+ try {
+ scan.getSpecificScan(i);
+ fail("Should not have " + i + "th assignment or you have not enabled Java assertion.");
+ } catch (AssertionError e) { }
+ }
+}
[4/6] git commit: DRILL-818: Fix output type while adding date and
interval types.
Posted by ja...@apache.org.
DRILL-818: Fix output type while adding date and interval types.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/292765e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/292765e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/292765e7
Branch: refs/heads/master
Commit: 292765e7211d6ddddb64e67bdcff64829a60a615
Parents: d52e325
Author: Mehant Baid <me...@gmail.com>
Authored: Sun Jun 1 15:29:32 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:13:07 2014 -0700
----------------------------------------------------------------------
.../DateIntervalArithmeticFunctions.java | 4 ++++
.../drill/exec/record/vector/TestDateTypes.java | 2 +-
.../org/apache/drill/jdbc/test/TestJdbcQuery.java | 15 +++++++++++++--
3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
index 9103a86..4fbfdf9 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateIntervalArithmeticFunctions.java
@@ -88,7 +88,11 @@ public class ${datetype}${intervaltype}Functions {
@Param ${datetype}Holder left;
@Param ${intervaltype}Holder right;
@Workspace org.joda.time.MutableDateTime temp;
+ <#if datetype == "Date" && (intervaltype.startsWith("Interval"))>
+ @Output TimeStampHolder out;
+ <#else>
@Output ${datetype}Holder out;
+ </#if>
public void setup(RecordBatch incoming) {
<#if datetype == "TimeStampTZ">
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
index adae024..f4fd7ae 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestDateTypes.java
@@ -293,7 +293,7 @@ public class TestDateTypes extends PopUnitTestBase {
ValueVector.Accessor accessor = v.getValueVector().getAccessor();
- assertEquals((accessor.getObject(0).toString()), ("2008-03-27"));
+ assertEquals((accessor.getObject(0).toString()), ("2008-03-27 00:00:00.000"));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/292765e7/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index e684228..088191c 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -44,6 +44,7 @@ import org.junit.rules.TestRule;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -764,7 +765,9 @@ import static org.junit.Assert.fail;
// show tables on view
ResultSet resultSet = statement.executeQuery("select date '2008-2-23', time '12:23:34', timestamp '2008-2-23 12:23:34.456', " +
- "interval '1' year, interval '2' day " +
+ "interval '1' year, interval '2' day, " +
+ "date_add(date '2008-2-23', interval '1 10:20:30' day to second), " +
+ "date_add(date '2010-2-23', 1) " +
"from cp.`employee.json` limit 1");
java.sql.Date date = resultSet.getDate(1);
@@ -772,9 +775,17 @@ import static org.junit.Assert.fail;
java.sql.Timestamp ts = resultSet.getTimestamp(3);
String intervalYear = resultSet.getString(4);
String intervalDay = resultSet.getString(5);
+ java.sql.Timestamp ts1 = resultSet.getTimestamp(6);
+ java.sql.Date date1 = resultSet.getDate(7);
+
+ java.sql.Timestamp result = java.sql.Timestamp.valueOf("2008-2-24 10:20:30");
+ java.sql.Date result1 = java.sql.Date.valueOf("2010-2-24");
+ assertEquals(ts1, result);
+ assertEquals(date1, result1);
System.out.println("Date: " + date.toString() + " time: " + time.toString() + " timestamp: " + ts.toString() +
- "\ninterval year: " + intervalYear + " intervalDay: " + intervalDay);
+ "\ninterval year: " + intervalYear + " intervalDay: " + intervalDay +
+ " date_interval_add: " + ts1.toString() + "date_int_add: " + date1.toString());
statement.close();
return null;
[2/6] git commit: DRILL-853: Enable broadcast joins and fix some
issues with BroadcastExchange and ScreenCreator.
Posted by ja...@apache.org.
DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/760cbd42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/760cbd42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/760cbd42
Branch: refs/heads/master
Commit: 760cbd421c131ed43f5011764c7e244b661bd84b
Parents: 623a52e
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed May 28 17:32:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:09 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/ScreenCreator.java | 4 +-
.../BroadcastSenderRootExec.java | 51 +++++++++++++++-----
.../exec/planner/physical/PlannerSettings.java | 11 +++--
.../apache/drill/exec/record/WritableBatch.java | 6 +++
.../apache/drill/exec/rpc/data/DataTunnel.java | 12 ++---
.../server/options/SystemOptionManager.java | 1 +
6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c92633f..9aefbe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -74,6 +74,7 @@ public class ScreenCreator implements RootCreator<Screen>{
public boolean next() {
if(!ok){
stop();
+ context.fail(this.listener.ex);
return false;
}
@@ -135,7 +136,7 @@ public class ScreenCreator implements RootCreator<Screen>{
private SendListener listener = new SendListener();
private class SendListener extends BaseRpcOutcomeListener<Ack>{
-
+ volatile RpcException ex;
@Override
@@ -150,6 +151,7 @@ public class ScreenCreator implements RootCreator<Screen>{
logger.error("Failure while sending data to user.", ex);
ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
ok = false;
+ this.ex = ex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 0a01583..9c55825 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.broadcastsender;
+import io.netty.buffer.ByteBuf;
+
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
@@ -26,13 +28,15 @@ import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.work.ErrorHelper;
/**
* Broadcast Sender broadcasts incoming batches to all receivers (one or more).
@@ -47,7 +51,6 @@ public class BroadcastSenderRootExec implements RootExec {
private final ExecProtos.FragmentHandle handle;
private volatile boolean ok;
private final RecordBatch incoming;
- private final DrillRpcFuture[] responseFutures;
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
@@ -63,12 +66,12 @@ public class BroadcastSenderRootExec implements RootExec {
FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
}
- responseFutures = new DrillRpcFuture[destinations.size()];
}
@Override
public boolean next() {
if(!ok) {
+ context.fail(statusHandler.ex);
return false;
}
@@ -79,24 +82,25 @@ public class BroadcastSenderRootExec implements RootExec {
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
- responseFutures[i] = tunnels[i].sendRecordBatch(context, b2);
+ tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ statusHandler.sendCount.increment();
}
- waitAllFutures(false);
return false;
case OK_NEW_SCHEMA:
case OK:
WritableBatch writableBatch = incoming.getWritableBatch();
+ if (tunnels.length > 1) {
+ writableBatch.retainBuffers(tunnels.length - 1);
+ }
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
- if(i > 0) {
- writableBatch.retainBuffers();
- }
- responseFutures[i] = tunnels[i].sendRecordBatch(context, batch);
+ tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ statusHandler.sendCount.increment();
}
- return waitAllFutures(true);
+ return ok;
case NOT_YET:
default:
@@ -104,6 +108,7 @@ public class BroadcastSenderRootExec implements RootExec {
}
}
+ /*
private boolean waitAllFutures(boolean haltOnError) {
for (DrillRpcFuture<?> responseFuture : responseFutures) {
try {
@@ -124,10 +129,34 @@ public class BroadcastSenderRootExec implements RootExec {
}
return true;
}
-
+*/
+
@Override
public void stop() {
ok = false;
+ statusHandler.sendCount.waitForSendComplete();
incoming.cleanup();
}
+
+ private StatusHandler statusHandler = new StatusHandler();
+ private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
+ volatile RpcException ex;
+ private final SendingAccountor sendCount = new SendingAccountor();
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ sendCount.decrement();
+ super.success(value, buffer);
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ sendCount.decrement();
+ logger.error("Failure while sending data to user.", ex);
+ ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+ ok = false;
+ this.ex = ex;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 18a32af..ad9fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -22,13 +22,15 @@ import net.hydromatic.optiq.tools.FrameworkContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValidator;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
public class PlannerSettings implements FrameworkContext{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
private int numEndPoints = 0;
private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
- private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold
+
+ public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE;
public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
@@ -36,7 +38,8 @@ public class PlannerSettings implements FrameworkContext{
public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
- public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
+ public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
+ public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000);
public OptionManager options = null;
@@ -88,8 +91,8 @@ public class PlannerSettings implements FrameworkContext{
return options.getOption(BROADCAST.getOptionName()).bool_val;
}
- public int getBroadcastThreshold() {
- return broadcastThreshold;
+ public long getBroadcastThreshold() {
+ return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4ff3708..14ade39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,4 +163,10 @@ public class WritableBatch {
buf.retain();
}
}
+
+ public void retainBuffers(int increment) {
+ for (ByteBuf buf : buffers) {
+ buf.retain(increment);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 1dcd89e..98bbeeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -37,22 +37,22 @@ public class DataTunnel {
}
public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
- SendBatch b = new SendBatch(outcomeListener, batch);
+ SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
manager.runCommand(b);
}
public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
- SendBatchAsync b = new SendBatchAsync(batch, context);
+ SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
manager.runCommand(b);
return b.getFuture();
}
- public static class SendBatch extends ListeningCommand<Ack, DataClientConnection> {
+ private static class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
final FragmentWritableBatch batch;
- public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
+ public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
super(listener);
this.batch = batch;
}
@@ -70,11 +70,11 @@ public class DataTunnel {
}
- public static class SendBatchAsync extends FutureBitCommand<Ack, DataClientConnection> {
+ private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
final FragmentWritableBatch batch;
final FragmentContext context;
- public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) {
+ public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
super();
this.batch = batch;
this.context = context;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8d9a68f..3e90eb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -41,6 +41,7 @@ public class SystemOptionManager implements OptionManager{
PlannerSettings.MERGEJOIN,
PlannerSettings.MULTIPHASE,
PlannerSettings.BROADCAST,
+ PlannerSettings.BROADCAST_THRESHOLD,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR
};
[5/6] git commit: DRILL-855: Improve work assignment parallelization
Posted by ja...@apache.org.
DRILL-855: Improve work assignment parallelization
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d929faac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d929faac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d929faac
Branch: refs/heads/master
Commit: d929faace270302c5e00e272775be98d5a8f83e1
Parents: 292765e
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Wed May 28 04:56:46 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:14:42 2014 -0700
----------------------------------------------------------------------
distribution/src/resources/drill-override.conf | 1 +
.../org/apache/drill/exec/ExecConstants.java | 1 +
.../drill/exec/physical/EndpointAffinity.java | 20 ++++--
.../drill/exec/planner/SimpleExecPlanner.java | 8 +--
.../planner/fragment/SimpleParallelizer.java | 47 ++++++++++----
.../drill/exec/planner/fragment/Wrapper.java | 68 +++++++++++---------
.../apache/drill/exec/work/foreman/Foreman.java | 29 ++-------
.../src/main/resources/drill-module.conf | 16 +++++
.../drill/exec/pop/TestFragmentChecker.java | 4 +-
9 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index a9316a9..da3d094 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -87,6 +87,7 @@ drill.exec: {
work: {
max.width.per.endpoint: 5,
global.max.width: 100,
+ affinity.factor: 1.2,
executor.threads: 4
},
trace: {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d9e0833..e66e93c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,6 +44,7 @@ public interface ExecConstants {
public static final String GLOBAL_MAX_WIDTH = "drill.exec.work.global.max.width";
public static final String MAX_WIDTH_PER_ENDPOINT = "drill.exec.work.max.width.per.endpoint";
public static final String EXECUTOR_THREADS = "drill.exec.work.executor.threads";
+ public static final String AFFINITY_FACTOR = "drill.exec.work.affinity.factor";
public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index f3059ae..df31f74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -19,18 +19,20 @@ package org.apache.drill.exec.physical;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import com.google.protobuf.TextFormat;
+
public class EndpointAffinity implements Comparable<EndpointAffinity>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
-
+
private DrillbitEndpoint endpoint;
private float affinity = 0.0f;
-
+
public EndpointAffinity(DrillbitEndpoint endpoint) {
super();
this.endpoint = endpoint;
}
-
+
public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
super();
this.endpoint = endpoint;
@@ -46,15 +48,19 @@ public class EndpointAffinity implements Comparable<EndpointAffinity>{
public float getAffinity() {
return affinity;
}
-
+
@Override
public int compareTo(EndpointAffinity o) {
return Float.compare(affinity, o.affinity);
}
-
+
public void addAffinity(float f){
affinity += f;
}
-
-
+
+ @Override
+ public String toString() {
+ return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
index 4da6500..132fde7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -51,9 +51,9 @@ public class SimpleExecPlanner implements ExecPlanner{
int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
- return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
- context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint);
-
-
+ parallelizer.setGlobalMaxWidth(maxWidth).setMaxWidthPerEndpoint(maxWidthPerEndpoint);
+ return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(),
+ context.getQueryId(), context.getActiveEndpoints(), context.getPlanReader(), fragmentRoot, planningSet);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 313a81d..d226b08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -51,6 +51,36 @@ public class SimpleParallelizer {
private final Materializer materializer = new Materializer();
/**
+ * The maximum level or parallelization any stage of the query can do. Note that while this
+ * might be the number of active Drillbits, realistically, this could be well beyond that
+ * number of we want to do things like speed results return.
+ */
+ private int globalMaxWidth;
+ public SimpleParallelizer setGlobalMaxWidth(int globalMaxWidth) {
+ this.globalMaxWidth = globalMaxWidth;
+ return this;
+ }
+
+ /**
+ * Limits the maximum level of parallelization to this factor time the number of Drillbits
+ */
+ private int maxWidthPerEndpoint;
+ public SimpleParallelizer setMaxWidthPerEndpoint(int maxWidthPerEndpoint) {
+ this.maxWidthPerEndpoint = maxWidthPerEndpoint;
+ return this;
+ }
+
+
+ /**
+ * Factor by which a node with endpoint affinity will be favored while creating assignment
+ */
+ private double affinityFactor = 1.2f;
+ public SimpleParallelizer setAffinityFactor(double affinityFactor) {
+ this.affinityFactor = affinityFactor;
+ return this;
+ }
+
+ /**
* Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
* beyond the global max width.
*
@@ -60,16 +90,12 @@ public class SimpleParallelizer {
* @param reader Tool used to read JSON plans
* @param rootNode The root node of the PhysicalPlan that we will parallelizing.
* @param planningSet The set of queries with collected statistics that we'll work with.
- * @param globalMaxWidth The maximum level or parallelization any stage of the query can do. Note that while this
- * might be the number of active Drillbits, realistically, this could be well beyond that
- * number of we want to do things like speed results return.
- * @param maxWidthPerEndpoint Limits the maximum level of parallelization to this factor time the number of Drillbits
* @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
- int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException {
- assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint);
+ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
+ PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException {
+ assignEndpoints(activeEndpoints, planningSet);
return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet);
}
@@ -152,11 +178,9 @@ public class SimpleParallelizer {
}
return new QueryWorkUnit(rootOperator, rootFragment, fragments);
-
}
- private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet,
- int globalMaxWidth, int maxWidthPerEndpoint) throws PhysicalOperatorSetupException {
+ private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet) throws PhysicalOperatorSetupException {
// First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
// cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
// could be based on endpoint load)
@@ -181,7 +205,8 @@ public class SimpleParallelizer {
// logger.debug("Setting width {} on fragment {}", width, wrapper);
wrapper.setWidth(width);
// figure out endpoint assignments. also informs the exchanges about their respective endpoints.
- wrapper.assignEndpoints(allNodes);
+ wrapper.assignEndpoints(allNodes, affinityFactor);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8602bf0..38cba09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.planner.fragment;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -47,7 +51,7 @@ public class Wrapper {
private int width = -1;
private final Stats stats;
private boolean endpointsAssigned;
- private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+ private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = Maps.newHashMap();
private long initialAllocation = 0;
private long maxAllocation = 0;
@@ -71,14 +75,14 @@ public class Wrapper {
addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
}
}
-
+
public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
Preconditions.checkState(!endpointsAssigned);
Preconditions.checkNotNull(endpoint);
- EndpointAffinity ea = endpointAffinity.get(endpoint);
+ EndpointAffinity ea = endpointAffinityMap.get(endpoint);
if (ea == null) {
ea = new EndpointAffinity(endpoint);
- endpointAffinity.put(endpoint, ea);
+ endpointAffinityMap.put(endpoint, ea);
}
ea.addAffinity(affinity);
@@ -116,7 +120,6 @@ public class Wrapper {
private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
-
@Override
public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
if(exchange == node.getSendingExchange()){
@@ -148,40 +151,42 @@ public class Wrapper {
public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
return visitChildren(op, value);
}
-
+
}
-
- public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws PhysicalOperatorSetupException {
- Preconditions.checkState(!endpointsAssigned);
+ public void assignEndpoints(Collection<DrillbitEndpoint> allEndpoints, double affinityFactor) throws PhysicalOperatorSetupException {
+ Preconditions.checkState(!endpointsAssigned);
endpointsAssigned = true;
- List<EndpointAffinity> values = Lists.newArrayList();
- values.addAll(endpointAffinity.values());
-
- if (values.size() == 0) {
- List<DrillbitEndpoint> all = Lists.newArrayList(allPossible);
- final int div = allPossible.size();
- int start = ThreadLocalRandom.current().nextInt(div);
- // round robin with random start.
- for (int i = start; i < start + width; i++) {
- Preconditions.checkNotNull(all.get(i % div));
- endpoints.add(all.get(i % div));
- }
- } else {
+ if (endpointAffinityMap.size() > 0) {
+ List<EndpointAffinity> affinedEPs = Lists.newArrayList(endpointAffinityMap.values());
// get nodes with highest affinity.
- Collections.sort(values);
- values = Lists.reverse(values);
- for (int i = 0; i < width; i++) {
- Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
- endpoints.add(values.get(i%values.size()).getEndpoint());
+ Collections.sort(affinedEPs);
+ Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(Lists.reverse(affinedEPs));
+ /** Maximum number of slots which should go to endpoints with affinity */
+ int affinedSlots = Math.min((Math.max(1, (int) (affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width);
+ while(endpoints.size() < affinedSlots) {
+ EndpointAffinity ea = affinedEPItr.next();
+ DrillbitEndpoint endpoint = ea.getEndpoint();
+ endpoints.add(endpoint);
+ }
+ }
+ // add other endpoints if required
+ if (endpoints.size() < width) {
+ List<DrillbitEndpoint> all = Lists.newArrayList(allEndpoints);
+ all.removeAll(endpointAffinityMap.keySet());
+ // round robin with random start.
+ Collections.shuffle(all, ThreadLocalRandom.current());
+ Iterator<DrillbitEndpoint> otherEPItr = Iterators.cycle(all.size() > 0 ? all : endpointAffinityMap.keySet());
+ while (endpoints.size() < width) {
+ endpoints.add(otherEPItr.next());
}
}
// Set scan and store endpoints.
AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
node.getRoot().accept(visitor, endpoints);
-
+
// Set the endpoints for this (one at most) sending exchange.
if (node.getSendingExchange() != null) {
node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
@@ -202,4 +207,5 @@ public class Wrapper {
Preconditions.checkState(endpointsAssigned);
return this.endpoints.get(minorFragmentId);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index eb1d738..b8edb84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -121,7 +121,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
cleanupAndSendResult(result);
}
-
public void cancel() {
if(isFinished()){
return;
@@ -148,8 +147,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
-
-
/**
* Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
*/
@@ -160,10 +157,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
// convert a run query request into action
try{
switch (queryRequest.getType()) {
-
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
-
break;
case PHYSICAL:
parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -185,7 +180,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
-
private void parseAndRunLogicalPlan(String json) {
try {
@@ -249,8 +243,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
-
-
private void parseAndRunPhysicalPlan(String json) {
try {
PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -260,7 +252,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
-
private void runPhysicalPlan(PhysicalPlan plan) {
if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -278,12 +269,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
- SimpleParallelizer parallelizer = new SimpleParallelizer();
+ SimpleParallelizer parallelizer = new SimpleParallelizer()
+ .setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH))
+ .setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT))
+ .setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR));
try {
- QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
- context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
- context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
+ QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(),
+ queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet);
this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager);
List<PlanFragment> leafFragments = Lists.newArrayList();
@@ -292,17 +285,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
// store fragments in distributed grid.
logger.debug("Storing fragments");
for (PlanFragment f : work.getFragments()) {
-
// store all fragments in grid since they are part of handshake.
-
context.getCache().storeFragment(f);
if (f.getLeafFragment()) {
leafFragments.add(f);
} else {
intermediateFragments.add(f);
}
-
-
}
logger.debug("Fragments stored.");
@@ -311,7 +300,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
logger.debug("Fragments running.");
-
} catch (ExecutionSetupException | RpcException e) {
fail("Failure while setting up query.", e);
}
@@ -352,7 +340,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
return this.state.getState();
}
-
class ForemanManagerListener{
void fail(String message, Throwable t) {
ForemanManagerListener.this.fail(message, t);
@@ -364,13 +351,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
-
-
@Override
public int compareTo(Object o) {
return o.hashCode() - o.hashCode();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 26205bd..f8396bb 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
// This file tells Drill to consider this module when class path scanning.
// This file can also include any supplementary configuration information.
// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
@@ -76,6 +91,7 @@ drill.exec: {
work: {
max.width.per.endpoint: 5,
global.max.width: 100,
+ affinity.factor: 1.2,
executor.threads: 4
},
trace: {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 1b38dce..ea19351 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -59,8 +59,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
endpoints.add(b1);
}
-
- QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+ par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5);
+ QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
System.out.print(qwu.getRootFragment().getFragmentJson());