You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/09 09:23:51 UTC
[4/8] drill git commit: DRILL-2187: Single Broadcast Sender
DRILL-2187: Single Broadcast Sender
Also includes:
1. Fix merge join planning issue (1c5c810 by jinfengni)
2. ExternalSort: Check the memory available for in-memory sorting or not in making decision to spill or not (36f9dd1)
3. Cleanup in ExternalSortBatch and its helper classes (36f9dd1)
4. MergeJoinBatch: Limit the outgoing record batch size to 2^15 (37dfeb8)
5. StreamingAggBatch: Limit outgoing record batch size to 2^15 (7d8a2e4)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9fd1430d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9fd1430d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9fd1430d
Branch: refs/heads/master
Commit: 9fd1430db69309c2e476007de851142f778ddb62
Parents: 6be9e3b
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Feb 1 08:43:47 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun Mar 8 22:48:45 2015 -0700
----------------------------------------------------------------------
.../src/main/java/io/netty/buffer/DrillBuf.java | 78 +-
.../java/io/netty/buffer/FakeAllocator.java | 6 +
.../org/apache/drill/exec/memory/Accountor.java | 12 +-
.../drill/exec/memory/BufferAllocator.java | 9 +-
.../drill/exec/memory/TopLevelAllocator.java | 17 +-
.../apache/drill/exec/ops/FragmentContext.java | 10 +-
.../exec/physical/impl/SingleSenderCreator.java | 2 +-
.../impl/aggregate/StreamingAggTemplate.java | 90 +-
.../BroadcastSenderRootExec.java | 64 +-
.../exec/physical/impl/join/JoinStatus.java | 8 +-
.../exec/physical/impl/join/JoinTemplate.java | 10 +-
.../PartitionSenderRootExec.java | 3 +-
.../partitionsender/PartitionerTemplate.java | 2 +-
.../impl/sort/SortRecordBatchBuilder.java | 47 +-
.../physical/impl/xsort/ExternalSortBatch.java | 114 +--
.../exec/physical/impl/xsort/MSortTemplate.java | 14 +-
.../impl/xsort/PriorityQueueCopier.java | 3 +
.../impl/xsort/PriorityQueueCopierTemplate.java | 7 +-
.../exec/planner/logical/DrillRuleSets.java | 13 +-
.../planner/physical/BroadcastExchangePrel.java | 13 +-
.../physical/DrillDistributionTraitDef.java | 3 +-
.../exec/planner/physical/HashJoinPrule.java | 25 +-
.../exec/planner/physical/JoinPruleBase.java | 79 +-
.../exec/planner/physical/MergeJoinPrel.java | 1 +
.../exec/planner/physical/MergeJoinPrule.java | 23 +-
.../exec/planner/physical/PlannerSettings.java | 7 +-
.../exec/planner/physical/ProjectPrule.java | 2 +-
.../exec/planner/physical/StreamAggPrule.java | 6 +-
.../planner/physical/SubsetTransformer.java | 14 +-
.../drill/exec/proto/helper/QueryIdHelper.java | 8 +
.../exec/record/FragmentWritableBatch.java | 37 +-
.../drill/exec/record/RawFragmentBatch.java | 17 +-
.../drill/exec/record/VectorAccessible.java | 7 -
.../java/org/apache/drill/exec/rpc/RpcBus.java | 1 +
.../apache/drill/exec/rpc/data/AckSender.java | 63 ++
.../apache/drill/exec/rpc/data/DataClient.java | 2 +-
.../exec/rpc/data/DataConnectionCreator.java | 4 +-
.../exec/rpc/data/DataConnectionManager.java | 18 +-
.../rpc/data/DataProtobufLengthDecoder.java | 19 +-
.../exec/rpc/data/DataResponseHandler.java | 7 +-
.../exec/rpc/data/DataResponseHandlerImpl.java | 28 +-
.../drill/exec/rpc/data/DataRpcConfig.java | 2 +-
.../apache/drill/exec/rpc/data/DataServer.java | 80 +-
.../server/options/SystemOptionManager.java | 1 +
.../exec/work/batch/ResponseSenderQueue.java | 11 +-
.../exec/work/batch/SpoolingRawBatchBuffer.java | 2 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 10 +-
.../impl/broadcastsender/TestBroadcast.java | 50 ++
.../apache/drill/exec/server/TestBitRpc.java | 19 +-
.../work/batch/TestUnlimitedBatchBuffer.java | 34 +-
.../test/resources/broadcast/customer/cust.json | 4 +
.../src/test/resources/broadcast/sales/f1.json | 2 +
.../src/test/resources/broadcast/sales/f2.json | 2 +
.../org/apache/drill/exec/proto/BitData.java | 879 ++++++++++---------
.../apache/drill/exec/proto/SchemaBitData.java | 71 +-
.../exec/proto/beans/BitClientHandshake.java | 24 -
.../exec/proto/beans/FragmentRecordBatch.java | 114 ++-
protocol/src/main/protobuf/BitData.proto | 15 +-
58 files changed, 1324 insertions(+), 889 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 8e9d395..d87fb76 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,6 +27,7 @@ import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.exec.memory.Accountor;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +44,7 @@ public final class DrillBuf extends AbstractByteBuf {
private final long addr;
private final int offset;
private final boolean rootBuffer;
-
+ private final AtomicLong rootRefCnt = new AtomicLong(1);
private volatile BufferAllocator allocator;
private volatile Accountor acct;
private volatile int length;
@@ -83,11 +84,36 @@ public final class DrillBuf extends AbstractByteBuf {
this.acct = a;
this.length = 0;
this.addr = 0;
- this.rootBuffer = true;
+ this.rootBuffer = false;
this.offset = 0;
}
+ /**
+ * Special constructor used for RPC ownership transfer. Takes a snapshot slice of the current buf
+ * but points directly to the underlying UnsafeLittleEndian buffer. Does this by calling unwrap()
+ * twice on the provided DrillBuf and expecting an UnsafeDirectLittleEndian buffer. This operation
+ * includes taking a new reference count on the underlying buffer and maintaining returning with a
+ * current reference count for itself (masking the underlying reference count).
+ * @param allocator
+ * @param a Allocator used when users try to receive allocator from buffer.
+ * @param b Accountor used for accounting purposes.
+ */
+ public DrillBuf(BufferAllocator allocator, Accountor a, DrillBuf b) {
+ this(allocator, a, getUnderlying(b), b, 0, b.length, true);
+ assert b.unwrap().unwrap() instanceof UnsafeDirectLittleEndian;
+ b.unwrap().unwrap().retain();
+ }
+
+
private DrillBuf(DrillBuf buffer, int index, int length) {
+ this(buffer.allocator, null, buffer, buffer, index, length, false);
+ }
+
+ private static ByteBuf getUnderlying(DrillBuf b){
+ ByteBuf underlying = b.unwrap().unwrap();
+ return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
+ }
+ private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
super(length);
if (index < 0 || index > buffer.capacity() - length) {
throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + index + ", " + length + ')');
@@ -96,13 +122,13 @@ public final class DrillBuf extends AbstractByteBuf {
this.length = length;
writerIndex(length);
- this.b = buffer;
+ this.b = replacement;
this.addr = buffer.memoryAddress() + index;
this.offset = index;
- this.acct = null;
+ this.acct = a;
this.length = length;
- this.rootBuffer = false;
- this.allocator = buffer.allocator;
+ this.rootBuffer = root;
+ this.allocator = allocator;
}
public void setOperatorContext(OperatorContext c) {
@@ -132,7 +158,12 @@ public final class DrillBuf extends AbstractByteBuf {
@Override
public int refCnt() {
- return b.refCnt();
+ if(rootBuffer){
+ return (int) this.rootRefCnt.get();
+ }else{
+ return b.refCnt();
+ }
+
}
private long addr(int index) {
@@ -203,20 +234,26 @@ public final class DrillBuf extends AbstractByteBuf {
@Override
public synchronized boolean release() {
- if (b.release() && rootBuffer) {
- acct.release(this, length);
- return true;
- }
- return false;
+ return release(1);
}
+ /**
+ * Release the provided number of reference counts. If this is a root buffer, will decrease accounting if the local reference count returns to zero.
+ */
@Override
public synchronized boolean release(int decrement) {
- if (b.release(decrement) && rootBuffer) {
- acct.release(this, length);
- return true;
+
+ if(rootBuffer){
+ if(0 == this.rootRefCnt.addAndGet(-decrement)){
+ b.release(decrement);
+ acct.release(this, length);
+ return true;
+ }else{
+ return false;
+ }
+ }else{
+ return b.release(decrement);
}
- return false;
}
@Override
@@ -405,14 +442,17 @@ public final class DrillBuf extends AbstractByteBuf {
@Override
public ByteBuf retain(int increment) {
- b.retain(increment);
+ if(rootBuffer){
+ this.rootRefCnt.addAndGet(increment);
+ }else{
+ b.retain(increment);
+ }
return this;
}
@Override
public ByteBuf retain() {
- b.retain();
- return this;
+ return retain(1);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
index 3de0a75..721aff9 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.util.Pointer;
class FakeAllocator implements BufferAllocator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FakeAllocator.class);
@@ -155,4 +156,9 @@ class FakeAllocator implements BufferAllocator {
}
+ @Override
+ public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut) {
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 2b48ef0..eb932ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -110,8 +110,18 @@ public class Accountor {
}
public boolean transferTo(Accountor target, DrillBuf buf, long size) {
+ return transfer(target, buf, size, true);
+ }
+
+ public boolean transferIn(DrillBuf buf, long size) {
+ return transfer(this, buf, size, false);
+ }
+
+ private boolean transfer(Accountor target, DrillBuf buf, long size, boolean release) {
boolean withinLimit = target.forceAdditionalReservation(size);
- release(buf, size);
+ if(release){
+ release(buf, size);
+ }
if (ENABLE_ACCOUNTING) {
target.buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace()));
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 83d9d1e..30b905f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -19,11 +19,12 @@ package org.apache.drill.exec.memory;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
import java.io.Closeable;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.util.Pointer;
/**
* Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
@@ -63,6 +64,12 @@ public interface BufferAllocator extends Closeable {
*/
public boolean takeOwnership(DrillBuf buf) ;
+ /**
+ * Take over ownership of fragment accounting. Always takes over ownership.
+ * @param buf
+ * @return false if over allocation.
+ */
+ public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> bufOut);
public PreAllocator getNewPreAllocator();
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 67e1fdb..2a28bcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -25,15 +25,14 @@ import io.netty.buffer.UnsafeDirectLittleEndian;
import java.util.IdentityHashMap;
import java.util.HashMap;
import java.util.Map;
-
import java.util.Map.Entry;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
-
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.Pointer;
public class TopLevelAllocator implements BufferAllocator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
@@ -75,6 +74,13 @@ public class TopLevelAllocator implements BufferAllocator {
return buf.transferAccounting(acct);
}
+ @Override
+ public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+ DrillBuf b = new DrillBuf(this, acct, buf);
+ out.value = b;
+ return acct.transferIn(b, b.capacity());
+ }
+
public DrillBuf buffer(int min, int max) {
if (min == 0) {
return empty;
@@ -198,6 +204,13 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
+ public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+ DrillBuf b = new DrillBuf(this, acct, buf);
+ out.value = b;
+ return acct.transferIn(b, b.capacity());
+ }
+
+ @Override
public DrillBuf buffer(int size, int max) {
if (size == 0) {
return empty;
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e413921..108f5bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -60,7 +60,7 @@ public class FragmentContext implements Closeable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
- private Map<FragmentHandle, DataTunnel> tunnels = Maps.newHashMap();
+ private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
private final UserClientConnection connection;
@@ -239,11 +239,11 @@ public class FragmentContext implements Closeable {
return context.getController().getTunnel(endpoint);
}
- public DataTunnel getDataTunnel(DrillbitEndpoint endpoint, FragmentHandle remoteHandle) {
- DataTunnel tunnel = tunnels.get(remoteHandle);
+ public DataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+ DataTunnel tunnel = tunnels.get(endpoint);
if (tunnel == null) {
- tunnel = context.getDataConnectionsPool().getTunnel(endpoint, remoteHandle);
- tunnels.put(remoteHandle, tunnel);
+ tunnel = context.getDataConnectionsPool().getTunnel(endpoint);
+ tunnels.put(endpoint, tunnel);
}
return tunnel;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 6db9f4a..812c89c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -80,7 +80,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
this.config = config;
this.recMajor = config.getOppositeMajorFragmentId();
FragmentHandle opposite = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build();
- this.tunnel = context.getDataTunnel(config.getDestination(), opposite);
+ this.tunnel = context.getDataTunnel(config.getDestination());
this.context = context;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 14e6aff..36f9f29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -29,7 +29,8 @@ import org.apache.drill.exec.record.VectorWrapper;
public abstract class StreamingAggTemplate implements StreamingAggregator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
private static final boolean EXTRA_DEBUG = false;
- private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
+ private static final int OUTPUT_BATCH_SIZE = 32*1024;
+
private IterOutcome lastOutcome = null;
private boolean first = true;
private boolean newSchema = false;
@@ -37,14 +38,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
private int underlyingIndex = 0;
private int currentIndex;
private int addedRecordCount = 0;
- private boolean pendingOutput = false;
private IterOutcome outcome;
private int outputCount = 0;
private RecordBatch incoming;
- private BatchSchema schema;
private StreamingAggBatch outgoing;
private FragmentContext context;
- private InternalBatch remainderBatch;
private boolean done = false;
@@ -52,7 +50,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
this.context = context;
this.incoming = incoming;
- this.schema = incoming.getSchema();
this.outgoing = outgoing;
setupInterior(incoming, outgoing);
}
@@ -74,12 +71,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return outputCount;
}
- private AggOutcome tooBigFailure() {
- context.fail(new Exception(TOO_BIG_ERROR));
- this.outcome = IterOutcome.STOP;
- return AggOutcome.CLEANUP_AND_RETURN;
- }
-
@Override
public AggOutcome doWork() {
if (done) {
@@ -118,25 +109,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
}
- // pick up a remainder batch if we have one.
- if (remainderBatch != null) {
- outputToBatch( previousIndex );
- remainderBatch.clear();
- remainderBatch = null;
- return setOkAndReturn();
- }
-
-
- // setup for new output and pick any remainder.
- if (pendingOutput) {
- allocateOutgoing();
- pendingOutput = false;
- if (EXTRA_DEBUG) {
- logger.debug("Attempting to output remainder.");
- }
- outputToBatch( previousIndex);
- }
-
if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}
@@ -167,18 +139,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (EXTRA_DEBUG) {
logger.debug("Values were different, outputting previous batch.");
}
- outputToBatch(previousIndex);
- if (EXTRA_DEBUG) {
- logger.debug("Output successful.");
+ if(!outputToBatch(previousIndex)) {
+ // There is still space in outgoing container, so proceed to the next input.
+ if (EXTRA_DEBUG) {
+ logger.debug("Output successful.");
+ }
+ addRecordInc(currentIndex);
+ } else {
+ if (EXTRA_DEBUG) {
+ logger.debug("Output container has reached its capacity. Flushing it.");
+ }
+
+ // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls.
+ previousIndex = currentIndex;
+ incIndex();
+ return setOkAndReturn();
}
- addRecordInc(currentIndex);
}
previousIndex = currentIndex;
}
-
InternalBatch previous = null;
-
try {
while (true) {
if (previous != null) {
@@ -196,7 +177,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (first && addedRecordCount == 0) {
return setOkAndReturn();
} else if(addedRecordCount > 0) {
- outputToBatchPrev( previous, previousIndex, outputCount);
+ outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+ // (output container full or not) as we are not going to insert anymore records.
if (EXTRA_DEBUG) {
logger.debug("Received no more batches, returning.");
}
@@ -218,7 +200,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
}
if (addedRecordCount > 0) {
- outputToBatchPrev( previous, previousIndex, outputCount);
+ outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+ // (output container full or not) as we are not going to insert anymore records.
if (EXTRA_DEBUG) {
logger.debug("Wrote out end of previous batch, returning.");
}
@@ -249,7 +232,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
previousIndex = currentIndex;
if (addedRecordCount > 0) {
- outputToBatchPrev( previous, previousIndex, outputCount);
+ if (!outputToBatchPrev(previous, previousIndex, outputCount)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("Output container is full. flushing it.");
+ return setOkAndReturn();
+ }
+ }
continue outside;
}
}
@@ -263,8 +251,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
} finally {
- // make sure to clear previous if we haven't saved it.
- if (remainderBatch == null && previous != null) {
+ // make sure to clear previous
+ if (previous != null) {
previous.clear();
}
}
@@ -303,7 +291,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.RETURN_OUTCOME;
}
- private final void outputToBatch(int inIndex) {
+ // Returns output container status after insertion of the given record. Caller must check the return value if it
+ // plans to insert more records into outgoing container.
+ private final boolean outputToBatch(int inIndex) {
+ assert outputCount < OUTPUT_BATCH_SIZE:
+ "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
outputRecordKeys(inIndex, outputCount);
@@ -315,15 +307,24 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
resetValues();
outputCount++;
addedRecordCount = 0;
+
+ return outputCount == OUTPUT_BATCH_SIZE;
}
- private final void outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+ // Returns output container status after insertion of the given record. Caller must check the return value if it
+ // plans to inserts more record into outgoing container.
+ private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+ assert outputCount < OUTPUT_BATCH_SIZE:
+ "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
+
outputRecordKeysPrev(b1, inIndex, outIndex);
outputRecordValues(outIndex);
resetValues();
resetValues();
outputCount++;
addedRecordCount = 0;
+
+ return outputCount == OUTPUT_BATCH_SIZE;
}
private void addRecordInc(int index) {
@@ -333,9 +334,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
@Override
public void cleanup() {
- if (remainderBatch != null) {
- remainderBatch.clear();
- }
}
public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 22fa047..c255033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -43,6 +43,8 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.work.ErrorHelper;
+import com.google.common.collect.ArrayListMultimap;
+
/**
* Broadcast Sender broadcasts incoming batches to all receivers (one or more).
* This is useful in cases such as broadcast join where sending the entire table to join
@@ -52,6 +54,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
private final FragmentContext context;
private final BroadcastSender config;
+
+ private final int[][] receivingMinorFragments;
private final DataTunnel[] tunnels;
private final ExecProtos.FragmentHandle handle;
private volatile boolean ok;
@@ -70,20 +74,40 @@ public class BroadcastSenderRootExec extends BaseRootExec {
RecordBatch incoming,
BroadcastSender config) throws OutOfMemoryException {
super(context, new OperatorContext(config, context, null, false), config);
- //super(context, config);
this.ok = true;
this.context = context;
this.incoming = incoming;
this.config = config;
this.handle = context.getHandle();
List<DrillbitEndpoint> destinations = config.getDestinations();
- this.tunnels = new DataTunnel[destinations.size()];
+ ArrayListMultimap<DrillbitEndpoint, Integer> dests = ArrayListMultimap.create();
+
for(int i = 0; i < destinations.size(); ++i) {
- FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
- tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
+ dests.put(destinations.get(i), i);
}
+
+ int destCount = dests.keySet().size();
+ int i = 0;
+
+ this.tunnels = new DataTunnel[destCount];
+ this.receivingMinorFragments = new int[destCount][];
+ for(DrillbitEndpoint ep : dests.keySet()){
+ List<Integer> minorsList= dests.get(ep);
+ int[] minorsArray = new int[minorsList.size()];
+ int x = 0;
+ for(Integer m : minorsList){
+ minorsArray[x++] = m;
+ }
+ receivingMinorFragments[i] = minorsArray;
+ tunnels[i] = context.getDataTunnel(ep);
+ i++;
+ }
+
+
}
+
+
@Override
public boolean innerNext() {
if(!ok) {
@@ -97,14 +121,15 @@ public class BroadcastSenderRootExec extends BaseRootExec {
case STOP:
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
- FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
+ FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i]);
stats.startWait();
try {
tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ statusHandler.sendCount.increment();
} finally {
stats.stopWait();
}
- statusHandler.sendCount.increment();
+
}
return false;
@@ -116,15 +141,15 @@ public class BroadcastSenderRootExec extends BaseRootExec {
writableBatch.retainBuffers(tunnels.length - 1);
}
for (int i = 0; i < tunnels.length; ++i) {
- FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
+ FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), receivingMinorFragments[i], writableBatch);
updateStats(batch);
stats.startWait();
try {
tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ statusHandler.sendCount.increment();
} finally {
stats.stopWait();
}
- statusHandler.sendCount.increment();
}
return ok;
@@ -140,29 +165,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
}
- /*
- private boolean waitAllFutures(boolean haltOnError) {
- for (DrillRpcFuture<?> responseFuture : responseFutures) {
- try {
- GeneralRPCProtos.Ack ack = (GeneralRPCProtos.Ack) responseFuture.checkedGet();
- if(!ack.getOk()) {
- ok = false;
- if (haltOnError) {
- return false;
- }
- }
- } catch (RpcException e) {
- logger.error("Error sending batch to receiver: " + e);
- ok = false;
- if (haltOnError) {
- return false;
- }
- }
- }
- return true;
- }
-*/
-
@Override
public void stop() {
ok = false;
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8dfc8f1..299fd3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -29,7 +29,9 @@ import org.eigenbase.rel.JoinRelType;
* The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas.
*/
public final class JoinStatus {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+
+ private static final int OUTPUT_BATCH_SIZE = 32*1024;
public static enum RightSourceMode {
INCOMING, SV4;
@@ -165,6 +167,10 @@ public final class JoinStatus {
outputPosition = 0;
}
+ public final boolean isOutgoingBatchFull() {
+ return outputPosition == OUTPUT_BATCH_SIZE;
+ }
+
public final void incOutputPos() {
outputPosition++;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 4124e6d..7da9788 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -75,8 +75,6 @@ import org.eigenbase.rel.JoinRelType;
*/
public abstract class JoinTemplate implements JoinWorker {
- private static final int OUTPUT_BATCH_SIZE = 32*1024;
-
@Override
public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
doSetup(context, status, outgoing);
@@ -88,7 +86,7 @@ public abstract class JoinTemplate implements JoinWorker {
* @return true of join succeeded; false if the worker needs to be regenerated
*/
public final boolean doJoin(final JoinStatus status) {
- for (int i = 0; i < OUTPUT_BATCH_SIZE; i++) {
+ while(!status.isOutgoingBatchFull()) {
// for each record
// validate input iterators (advancing to the next record batch if necessary)
@@ -96,6 +94,9 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
+ if (status.isOutgoingBatchFull()) {
+ return false;
+ }
doCopyLeft(status.getLeftPosition(), status.getOutPosition());
status.incOutputPos();
@@ -139,6 +140,9 @@ public abstract class JoinTemplate implements JoinWorker {
boolean crossedBatchBoundaries = false;
int initialRightPosition = status.getRightPosition();
do {
+ if (status.isOutgoingBatchFull()) {
+ return false;
+ }
// copy all equal right keys to the output record batch
doCopyLeft(status.getLeftPosition(), status.getOutPosition());
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index a23bd7a..200e78e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -271,8 +271,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
int fieldId = 0;
StatusHandler statusHandler = new StatusHandler(sendCount, context);
for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
- FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
- DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
+ DataTunnel tunnel = context.getDataTunnel(endpoint);
FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
isLast,
handle.getQueryId(),
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 71ffd41..79076cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -97,7 +97,7 @@ public abstract class PartitionerTemplate implements Partitioner {
for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
- context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId, statusHandler));
+ context.getDataTunnel(endpoint), context, oContext.getAllocator(), fieldId, statusHandler));
fieldId++;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 707c41c..e559ece 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.impl.sort;
import java.util.ArrayList;
import java.util.List;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
@@ -63,7 +65,8 @@ public class SortRecordBatchBuilder {
}
/**
- * Add another record batch to the set of record batches.
+ * Add another record batch to the set of record batches. TODO: Refactor this and other {@link #add
+ * (RecordBatchData)} method into one method.
* @param batch
* @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
* @throws SchemaChangeException
@@ -98,19 +101,28 @@ public class SortRecordBatchBuilder {
return true;
}
- public boolean add(RecordBatchData rbd) {
+ public void add(RecordBatchData rbd) {
long batchBytes = getSize(rbd.getContainer());
if (batchBytes == 0 && batches.size() > 0) {
- return true;
+ return;
}
+
if(batchBytes + runningBytes > maxBytes) {
- return false; // enough data memory.
+ final String errMsg = String.format("Adding this batch causes the total size to exceed max allowed size. " +
+ "Current runningBytes %d, Incoming batchBytes %d. maxBytes %d", runningBytes, batchBytes, maxBytes);
+ logger.error(errMsg);
+ throw new DrillRuntimeException(errMsg);
}
- if(runningBatches+1 > Character.MAX_VALUE) {
- return false; // allowed in batch.
+ if(runningBatches >= Character.MAX_VALUE) {
+ final String errMsg = String.format("Tried to add more than %d number of batches.", Character.MAX_VALUE);
+ logger.error(errMsg);
+ throw new DrillRuntimeException(errMsg);
}
if(!svAllocator.preAllocate(rbd.getRecordCount()*4)) {
- return false; // sv allocation available.
+ final String errMsg = String.format("Failed to pre-allocate memory for SV. " + "Existing recordCount*4 = %d, " +
+ "incoming batch recordCount*4 = %d", recordCount * 4, rbd.getRecordCount() * 4);
+ logger.error(errMsg);
+ throw new DrillRuntimeException(errMsg);
}
@@ -120,12 +132,11 @@ public class SortRecordBatchBuilder {
if (sv2 != null) {
sv2.clear();
}
- return true;
+ return;
}
runningBytes += batchBytes;
batches.put(rbd.getContainer().getSchema(), rbd);
recordCount += rbd.getRecordCount();
- return true;
}
public void canonicalize() {
@@ -149,7 +160,12 @@ public class SortRecordBatchBuilder {
if (batches.keys().size() < 1) {
assert false : "Invalid to have an empty set of batches with no schemas.";
}
- sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+
+ final DrillBuf svBuffer = svAllocator.getAllocation();
+ if (svBuffer == null) {
+ throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
+ }
+ sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
@@ -225,4 +241,15 @@ public class SortRecordBatchBuilder {
return containerList;
}
+ /**
+ * For given recordcount how muchmemory does SortRecordBatchBuilder needs for its own purpose. This is used in
+ * ExternalSortBatch to make decisions about whether to spill or not.
+ *
+ * @param recordCount
+ * @return
+ */
+ public static long memoryNeeded(int recordCount) {
+ // We need 4 bytes (SV4) for each record.
+ return recordCount * 4;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index f320bbb..c03d6a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -65,7 +65,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
@@ -77,27 +76,24 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
private static final long MAX_SORT_BYTES = 1L * 1024 * 1024 * 1024;
- public static int SPILL_TARGET_RECORD_COUNT;
- public static int TARGET_RECORD_COUNT;
- public static int SPILL_BATCH_GROUP_SIZE;
- public static int SPILL_THRESHOLD;
- public static List<String> SPILL_DIRECTORIES;
- private Iterator<String> dirs;
-
- public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
- public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
-
-
+ private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+ private static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+ private static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+ private static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+ private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
+
+ private final int SPILL_BATCH_GROUP_SIZE;
+ private final int SPILL_THRESHOLD;
+ private final List<String> SPILL_DIRECTORIES;
+ private final Iterator<String> dirs;
private final RecordBatch incoming;
+ private final BufferAllocator copierAllocator;
+
private BatchSchema schema;
private SingleBatchSorter sorter;
private SortRecordBatchBuilder builder;
private MSorter mSorter;
private PriorityQueueCopier copier;
- private BufferAllocator copierAllocator;
private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
private SelectionVector4 sv4;
@@ -105,12 +101,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
- private boolean useIncomingSchema = false;
private boolean first = true;
private long totalSizeInMemory = 0;
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
- private boolean stop = false;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
@@ -123,54 +117,29 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} catch (IOException e) {
throw new RuntimeException(e);
}
- SPILL_TARGET_RECORD_COUNT = config.getInt(ExecConstants.EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE);
- TARGET_RECORD_COUNT = config.getInt(ExecConstants.EXTERNAL_SORT_TARGET_BATCH_SIZE);
SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
uid = System.nanoTime();
- copierAllocator = oContext.getAllocator().getChildAllocator(context, 10000000, 20000000, true);
+ copierAllocator = oContext.getAllocator().getChildAllocator(
+ context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
}
@Override
public int getRecordCount() {
if (sv4 != null) {
return sv4.getCount();
- } else {
- return container.getRecordCount();
}
- }
-
- @Override
- public void kill(boolean sendUpstream) {
- incoming.kill(sendUpstream);
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
+ return container.getRecordCount();
}
@Override
public SelectionVector4 getSelectionVector4() {
- return this.sv4;
+ return sv4;
}
@Override
- public BatchSchema getSchema() {
- if (useIncomingSchema) {
- List<MaterializedField> fields = Lists.newArrayList();
- for (MaterializedField field : incoming.getSchema()) {
- fields.add(field);
- }
- return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
- }
- return super.getSchema();
- }
-
-
- @Override
public void cleanup() {
if (batchGroups != null) {
for (BatchGroup group: batchGroups) {
@@ -203,16 +172,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
for (VectorWrapper w : incoming) {
ValueVector v = container.addOrGet(w.getField());
if (v instanceof AbstractContainerVector) {
- w.getValueVector().makeTransferPair(v);
+ w.getValueVector().makeTransferPair(v); // Can we remove this hack?
v.clear();
}
- v.allocateNew();
+ v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO)
}
container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(0);
return;
case STOP:
- stop = true;
case NONE:
state = BatchState.DONE;
default:
@@ -224,13 +192,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public IterOutcome innerNext() {
if (schema != null) {
if (spillCount == 0) {
- if (schema != null) {
- if (getSelectionVector4().next()) {
- return IterOutcome.OK;
- } else {
- return IterOutcome.NONE;
- }
- }
+ return (getSelectionVector4().next()) ? IterOutcome.OK : IterOutcome.NONE;
} else {
Stopwatch w = new Stopwatch();
w.start();
@@ -247,7 +209,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
- long totalcount = 0;
+ int totalCount = 0;
try{
container.clear();
@@ -309,7 +271,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
int count = sv2.getCount();
- totalcount += count;
+ totalCount += count;
// if (count == 0) {
// break outer;
// }
@@ -324,10 +286,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
batchesSinceLastSpill++;
- if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
- (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
- (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
- (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+ if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
+ (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+ // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+ (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
+ // current memory used is more than 95% of memory usage limit of this operator
+ (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+ // current memory used is more than 95% of memory usage limit of this fragment
+ (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
+ // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
+ // since the last spill exceed the defined limit
+ (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+
mergeAndSpill();
batchesSinceLastSpill = 0;
}
@@ -346,7 +316,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
- if (totalcount == 0) {
+ if (totalCount == 0) {
return IterOutcome.NONE;
}
if (spillCount == 0) {
@@ -378,10 +348,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
createCopier(hyperBatch, batchGroups, container);
- int inMemoryRecordCount = 0;
- for (BatchGroup g : batchGroups) {
- inMemoryRecordCount += g.getRecordCount();
- }
+
int estimatedRecordSize = 0;
for (VectorWrapper w : batchGroups.get(0)) {
try {
@@ -390,7 +357,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
estimatedRecordSize += 50;
}
}
- targetRecordCount = (int) Math.max(1, 250 * 1000 / estimatedRecordSize);
+ targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, 250 * 1000 / estimatedRecordSize));
int count = copier.next(targetRecordCount);
container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(count);
@@ -408,6 +375,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
+ private boolean hasMemoryForInMemorySort(int currentRecordCount) {
+ long currentlyAvailable = popConfig.getMaxAllocation() - oContext.getAllocator().getAllocatedMemory();
+
+ long neededForInMemorySort = SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
+ MSortTemplate.memoryNeeded(currentRecordCount);
+
+ return currentlyAvailable > neededForInMemorySort;
+ }
+
public void mergeAndSpill() throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer();
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 3fd744f..94bc3a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Queues;
public abstract class MSortTemplate implements MSorter, IndexedSortable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
- private BufferAllocator allocator;
private SelectionVector4 vector4;
private SelectionVector4 aux;
private long compares;
@@ -46,7 +45,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
@Override
public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
- this.allocator = allocator;
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
this.vector4 = vector4.createNewWrapperCurrent();
@@ -70,6 +68,18 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
}
+ /**
+ * For given recordCount how much memory does MSorter needs for its own purpose. This is used in
+ * ExternalSortBatch to make decisions about whether to spill or not.
+ *
+ * @param recordCount
+ * @return
+ */
+ public static long memoryNeeded(int recordCount) {
+ // We need 4 bytes (SV4) for each record.
+ return recordCount * 4;
+ }
+
private int merge(int leftStart, int rightStart, int rightEnd, int outStart) {
int l = leftStart;
int r = rightStart;
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index d427744..161ca6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -26,6 +26,9 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
public interface PriorityQueueCopier {
+ public static long initialAllocation = 10000000;
+ public static long maxAllocation = 20000000;
+
public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
VectorAccessible outgoing) throws SchemaChangeException;
public int next(int targetRecordCount);
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index fe67064..f7786b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -30,23 +30,18 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
private SelectionVector4 vector4;
private List<BatchGroup> batchGroups;
private VectorAccessible hyperBatch;
- private FragmentContext context;
- private BufferAllocator allocator;
private VectorAccessible outgoing;
private int size;
private int queueSize = 0;
- private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
@Override
public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
VectorAccessible outgoing) throws SchemaChangeException {
- this.context = context;
- this.allocator = allocator;
this.hyperBatch = hyperBatch;
this.batchGroups = batchGroups;
this.outgoing = outgoing;
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 9c59a9e..496bc9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -162,11 +162,20 @@ public class DrillRuleSets {
}
if (ps.isHashJoinEnabled()) {
- ruleList.add(HashJoinPrule.INSTANCE);
+ ruleList.add(HashJoinPrule.DIST_INSTANCE);
+
+ if(ps.isBroadcastJoinEnabled()){
+ ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+ }
}
if (ps.isMergeJoinEnabled()) {
- ruleList.add(MergeJoinPrule.INSTANCE);
+ ruleList.add(MergeJoinPrule.DIST_INSTANCE);
+
+ if(ps.isBroadcastJoinEnabled()){
+ ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
+ }
+
}
return new DrillRuleSet(ImmutableSet.copyOf(ruleList));
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index 0c76de4..0467a07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -52,11 +52,14 @@ public class BroadcastExchangePrel extends ExchangePrel{
RelNode child = this.getChild();
- double inputRows = RelMetadataQuery.getRowCount(child);
- int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
- double cpuCost = DrillCostBase.SVR_CPU_COST * inputRows ;
- int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
- double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+ final int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
+ final double broadcastFactor = PrelUtil.getSettings(getCluster()).getBroadcastFactor();
+ final double inputRows = RelMetadataQuery.getRowCount(child);
+
+ final int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
+ final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows ;
+ final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+
return new DrillCostBase(inputRows, cpuCost, 0, networkCost);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index 6a1dbb7..ae079a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitDef;
+import org.eigenbase.relopt.volcano.RelSubset;
public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrait>{
public static final DrillDistributionTraitDef INSTANCE = new DrillDistributionTraitDef();
@@ -67,7 +68,7 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
// Source trait is "ANY", which is abstract type of distribution.
// We do not want to convert from "ANY", since it's abstract.
// Source trait should be concrete type: SINGLETON, HASH_DISTRIBUTED, etc.
- if (currentDist.equals(DrillDistributionTrait.DEFAULT)) {
+ if (currentDist.equals(DrillDistributionTrait.DEFAULT) && !(rel instanceof RelSubset) ) {
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 433405a..e802a40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -25,15 +25,19 @@ import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
import org.eigenbase.trace.EigenbaseTrace;
public class HashJoinPrule extends JoinPruleBase {
- public static final RelOptRule INSTANCE = new HashJoinPrule();
+ public static final RelOptRule DIST_INSTANCE = new HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
+ public static final RelOptRule BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+
protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
- private HashJoinPrule() {
- super(
- RelOptHelper.any(DrillJoinRel.class), "Prel.HashJoinPrule");
+ private final boolean isDist;
+ private HashJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
+ super(operand, name);
+ this.isDist = isDist;
}
@Override
@@ -60,14 +64,15 @@ public class HashJoinPrule extends JoinPruleBase {
try {
- createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
-
- if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
- createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
-
- // createBroadcastPlan1(call, join, PhysicalJoinType.HASH_JOIN, left, right, null, null);
+ if(isDist){
+ createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
+ }else{
+ if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+ createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
+ }
}
+
} catch (InvalidRelException e) {
tracer.warning(e.toString());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index afcbf71..77c055c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.planner.common.DrillJoinRelBase;
import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelType;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.metadata.RelMetadataQuery;
@@ -73,13 +74,11 @@ public abstract class JoinPruleBase extends Prule {
}
protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {
- if (! PrelUtil.getPlannerSettings(planner).isBroadcastJoinEnabled()) {
- return false;
- }
double estimatedRightRowCount = RelMetadataQuery.getRowCount(right);
if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
&& ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
+ && (join.getJoinType() == JoinRelType.INNER || join.getJoinType() == JoinRelType.LEFT)
) {
return true;
}
@@ -175,43 +174,69 @@ public abstract class JoinPruleBase extends Prule {
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
+ RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
assert collationLeft != null && collationRight != null;
+ traitsLeft = traitsLeft.plus(collationLeft);
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
} else {
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
}
- final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
final RelNode convertedLeft = convert(left, traitsLeft);
final RelNode convertedRight = convert(right, traitsRight);
- new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
-
- @Override
- public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- RelTraitSet newTraitsLeft;
- if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
- } else {
- newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
- }
- Character.digit(1, 1);
- RelNode newLeft = convert(left, newTraitsLeft);
- if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
- return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
- join.getJoinType());
- } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
- join.getJoinType());
- } else{
- return null;
- }
+ boolean traitProp = false;
+
+ if(traitProp){
+ if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+
+ @Override
+ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
+
+ RelNode newLeft = convert(left, newTraitsLeft);
+ return new MergeJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+ join.getJoinType());
+ }
+
+ }.go(join, convertedLeft);
+
+
+ }else{
+
+ new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+
+ @Override
+ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+ RelNode newLeft = convert(left, newTraitsLeft);
+ return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+ join.getJoinType());
+
+ }
+
+ }.go(join, convertedLeft);
}
- }.go(join, convertedLeft);
+ }else{
+ if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
+ join.getJoinType()));
+
+ }else{
+ call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
+ join.getJoinType()));
+ }
+ }
+
+
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index b7e86e3..fac18c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -116,4 +116,5 @@ public class MergeJoinPrel extends JoinPrel {
return SelectionVectorMode.NONE;
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index faffa63..5283467 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -29,18 +29,21 @@ import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.Lists;
public class MergeJoinPrule extends JoinPruleBase {
- public static final RelOptRule INSTANCE = new MergeJoinPrule();
+ public static final RelOptRule DIST_INSTANCE = new MergeJoinPrule("Prel.MergeJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
+ public static final RelOptRule BROADCAST_INSTANCE = new MergeJoinPrule("Prel.MergeJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+
protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
- private MergeJoinPrule() {
- super(
- RelOptHelper.any(DrillJoinRel.class),
- "Prel.MergeJoinPrule");
+ final boolean isDist;
+ private MergeJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
+ super(operand, name);
+ this.isDist = isDist;
}
@Override
@@ -64,10 +67,12 @@ public class MergeJoinPrule extends JoinPruleBase {
RelCollation collationLeft = getCollation(join.getLeftKeys());
RelCollation collationRight = getCollation(join.getRightKeys());
- createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
-
- if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
- createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight);
+ if(isDist){
+ createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
+ }else{
+ if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+ createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight);
+ }
}
} catch (InvalidRelException e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index bbfbbcb..ede0683 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -44,7 +44,8 @@ public class PlannerSettings implements Context{
public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
- public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000);
+ public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000000);
+ public static final OptionValidator BROADCAST_FACTOR = new RangeDoubleValidator("planner.broadcast_factor", 0, Double.MAX_VALUE, 1.0d);
public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d);
public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
@@ -80,6 +81,10 @@ public class PlannerSettings implements Context{
return options.getOption(JOIN_ROW_COUNT_ESTIMATE_FACTOR.getOptionName()).float_val;
}
+ public double getBroadcastFactor(){
+ return options.getOption(BROADCAST_FACTOR.getOptionName()).float_val;
+ }
+
public boolean useDefaultCosting() {
return useDefaultCosting;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 72034ed..b1d5a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -82,7 +82,7 @@ public class ProjectPrule extends Prule {
DrillDistributionTrait newDist = convertDist(childDist, inToOut);
RelCollation newCollation = convertRelCollation(childCollation, inToOut);
- RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation);
+ RelTraitSet newProjectTraits = newTraitSet(Prel.DRILL_PHYSICAL, newDist, newCollation);
return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(), project.getRowType());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 89b133a..929cb6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -54,7 +54,7 @@ public class StreamAggPrule extends AggPruleBase {
@Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
- final RelNode input = aggregate.getChild();
+ RelNode input = aggregate.getChild();
final RelCollation collation = getCollation(aggregate);
RelTraitSet traits = null;
@@ -78,7 +78,7 @@ public class StreamAggPrule extends AggPruleBase {
public RelNode convertChild(final DrillAggregateRel join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
- RelNode newInput = convert(input, traits);
+ RelNode newInput = convert(rel, traits);
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
@@ -130,7 +130,7 @@ public class StreamAggPrule extends AggPruleBase {
public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist);
- RelNode newInput = convert(input, traits);
+ RelNode newInput = convert(rel, traits);
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
index d4cd21f..72e06d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.planner.physical;
import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.ConventionTraitDef;
+import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTrait;
import org.eigenbase.relopt.RelTraitSet;
@@ -49,21 +51,25 @@ public abstract class SubsetTransformer<T extends RelNode, E extends Exception>
}
boolean transform = false;
-
for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
- if (!isDefaultDist(rel)) {
- RelNode out = convertChild(n, rel);
+ if (isPhysical(rel)) {
+ RelNode newRel = RelOptRule.convert(candidateSet, rel.getTraitSet().plus(Prel.DRILL_PHYSICAL));
+ RelNode out = convertChild(n, newRel);
if (out != null) {
call.transformTo(out);
transform = true;
-
}
}
}
+
return transform;
}
+ private boolean isPhysical(RelNode n){
+ return n.getTraitSet().getTrait(ConventionTraitDef.INSTANCE).equals(Prel.DRILL_PHYSICAL);
+ }
+
private boolean isDefaultDist(RelNode n) {
return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index bb12a22..1aadaa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.proto.helper;
+import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -39,4 +41,10 @@ public class QueryIdHelper {
public static String getQueryIdentifier(FragmentHandle h) {
return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
}
+
+ public static String getQueryIdentifiers(QueryId queryId, int majorFragmentId, List<Integer> minorFragmentIds) {
+ String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
+ return getQueryId(queryId) + ":" + majorFragmentId + ":" + fragmentIds;
+ }
+
}