You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:45:03 UTC
[25/27] git commit: optimized generated code for partition sender
optimized generated code for partition sender
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ffc674b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ffc674b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ffc674b
Branch: refs/heads/master
Commit: 8ffc674b7ce699aa6905166bf2060bc4e48e45c3
Parents: 4a10ea1
Author: Ben Becker <be...@gmail.com>
Authored: Sat Aug 10 01:20:53 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700
----------------------------------------------------------------------
.../partitionsender/OutgoingRecordBatch.java | 52 ++++-----
.../PartitionSenderRootExec.java | 115 ++++++++++++-------
.../exec/work/RemoteFragmentRunnerListener.java | 11 +-
3 files changed, 107 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 4ab598c..b40ce4c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,11 +19,10 @@
package org.apache.drill.exec.physical.impl.partitionsender;
import java.util.Iterator;
-import java.util.List;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.proto.ExecProtos;
@@ -52,7 +51,6 @@ public class OutgoingRecordBatch implements RecordBatch {
private RecordBatch incoming;
private FragmentContext context;
private BatchSchema outSchema;
- private List<ValueVector> valueVectors;
private VectorContainer vectorContainer;
private int recordCount;
private int recordCapacity;
@@ -65,29 +63,25 @@ public class OutgoingRecordBatch implements RecordBatch {
initializeBatch();
}
- public OutgoingRecordBatch() { }
-
- public void init(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context) {
- this.incoming = incoming;
- this.context = context;
- this.operator = operator;
- this.tunnel = tunnel;
- resetBatch();
- }
-
public void flushIfNecessary() {
- if (recordCount == recordCapacity - 1) flush();
+ if (recordCount == recordCapacity) logger.debug("Flush is necesary: Count is " + recordCount + ", capacity is " + recordCapacity);
+ try {
+ if (recordCount == recordCapacity) flush();
+ } catch (SchemaChangeException e) {
+ // TODO:
+ logger.error("Unable to flush outgoing record batch: " + e);
+ }
}
public void incRecordCount() {
++recordCount;
}
- public void flush() {
+ public void flush() throws SchemaChangeException {
if (recordCount == 0) {
logger.warn("Attempted to flush an empty record batch");
- return;
}
+ logger.debug("Flushing record batch. count is: " + recordCount + ", capacity is " + recordCapacity);
final ExecProtos.FragmentHandle handle = context.getHandle();
FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
handle.getQueryId(),
@@ -96,23 +90,26 @@ public class OutgoingRecordBatch implements RecordBatch {
operator.getOppositeMajorFragmentId(),
0,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+ tunnel.sendRecordBatch(statusHandler, context, writableBatch);
// reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly
// referenced by generated code and must not be replaced.
recordCount = 0;
for (VectorWrapper v : vectorContainer) {
- getAllocator(TypeHelper.getNewVector(v.getField(), context.getAllocator()),
- v.getValueVector()).alloc(recordCapacity);
+ logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector());
+ getAllocator(v.getValueVector(),
+ TypeHelper.getNewVector(v.getField(), context.getAllocator())).alloc(recordCapacity);
}
+ if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
}
+
/**
* Create a new output schema and allocate space for value vectors based on the incoming record batch.
*/
public void initializeBatch() {
+ isLast = false;
recordCapacity = incoming.getRecordCount();
- valueVectors = Lists.newArrayList();
vectorContainer = new VectorContainer();
SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -122,12 +119,13 @@ public class OutgoingRecordBatch implements RecordBatch {
bldr.addField(v.getField());
// allocate a new value vector
- vectorContainer.add(v.getValueVector());
ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
- getAllocator(outgoingVector, v.getValueVector()).alloc(recordCapacity);
- valueVectors.add(outgoingVector);
+ getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
+ vectorContainer.add(outgoingVector);
+ logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
}
outSchema = bldr.build();
+ logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
}
/**
@@ -135,13 +133,11 @@ public class OutgoingRecordBatch implements RecordBatch {
* on the incoming record batch.
*/
public void resetBatch() {
+ isLast = false;
recordCount = 0;
recordCapacity = 0;
- if (valueVectors != null) {
- for(ValueVector v : valueVectors){
- v.close();
- }
- }
+ for (VectorWrapper v : vectorContainer)
+ v.getValueVector().clear();
initializeBatch();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 313ddf3..aa25c96 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.partitionsender;
-import com.beust.jcommander.internal.Lists;
import com.sun.codemodel.*;
import org.apache.drill.common.expression.*;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -32,9 +31,10 @@ import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
import java.io.IOException;
-import java.util.List;
class PartitionSenderRootExec implements RootExec {
@@ -56,10 +56,10 @@ class PartitionSenderRootExec implements RootExec {
this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
int fieldId = 0;
for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations())
- outgoing[fieldId] = new OutgoingRecordBatch(operator,
- context.getCommunicator().getTunnel(endpoint),
- incoming,
- context);
+ outgoing[fieldId++] = new OutgoingRecordBatch(operator,
+ context.getCommunicator().getTunnel(endpoint),
+ incoming,
+ context);
try {
createPartitioner();
} catch (SchemaChangeException e) {
@@ -87,19 +87,25 @@ class PartitionSenderRootExec implements RootExec {
partitioner.partitionBatch(incoming);
// send all pending batches
- flushOutgoingBatches(true, false);
+ try {
+ flushOutgoingBatches(true, false);
+ } catch (SchemaChangeException e) {
+ incoming.kill();
+ logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
+ context.fail(e);
+ return false;
+ }
return false;
case OK_NEW_SCHEMA:
- // send all existing batches
- flushOutgoingBatches(false, true);
- // update OutgoingRecordBatch's schema and value vectors
try {
+ // send all existing batches
+ flushOutgoingBatches(false, true);
+ // update OutgoingRecordBatch's schema and generate partitioning code
createPartitioner();
- partitioner.setup(context, incoming, outgoing);
} catch (SchemaChangeException e) {
incoming.kill();
- logger.error("Failed to create partitioning sender during query ", e);
+ logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
context.fail(e);
return false;
}
@@ -133,54 +139,85 @@ class PartitionSenderRootExec implements RootExec {
}
// generate code to copy from an incoming value vector to the destination partition's outgoing value vector
- int fieldId = 0;
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
+ JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array();
+ JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array();
cg.rotateBlock();
- // declare array of record batches for each partition
+ // declare and assign the array of outgoing record batches
JVar outgoingBatches = cg.clazz.field(JMod.NONE,
- cg.getModel().ref(OutgoingRecordBatch.class).array(),
+ outgoingBatchArrayType,
"outgoingBatches");
-
cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing"));
- // declare incoming value vectors
- List<JVar> incomingVVs = Lists.newArrayList();
- for (VectorWrapper<?> vvIn : incoming)
- incomingVVs.add(cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
- fieldId++,
- vvIn.isHyper())));
+ // declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second
+ JVar outgoingVectors = cg.clazz.field(JMod.NONE,
+ outgoingVectorArrayType,
+ "outgoingVectors");
+ // create 2d array and build initialization list. For example:
+ // outgoingVectors = new ValueVector[][] {
+ // new ValueVector[] {vv1, vv2},
+ // new ValueVector[] {vv3, vv4}
+ // });
+ JArray outgoingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
+
+ int fieldId = 0;
int batchId = 0;
- fieldId = 0;
- // generate switch statement for each destination batch
- JSwitch switchStatement = cg.getBlock()._switch(outIndex);
for (OutgoingRecordBatch batch : outgoing) {
- // generate case statement for this batch
- JBlock caseBlock = switchStatement._case(JExpr.lit(batchId)).body();
-
+ JArray outgoingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class));
for (VectorWrapper<?> vv : batch) {
- // declare outgoing value vector and a corresponding counter
+ // declare outgoing value vector and assign it to the array
JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]",
new TypedFieldId(vv.getField().getType(),
fieldId,
false));
-
- caseBlock.add(outVV.invoke("copyFrom")
- .arg(inIndex)
- .arg(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("getRecordCount"))
- .arg(incomingVVs.get(fieldId)));
+ // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
+ outgoingVectorInitBatch.add(outVV);
++fieldId;
}
- caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("incRecordCount"));
- caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("flushIfNecessary"));
- fieldId = 0;
- caseBlock._break();
+
+ // add VV array to initialization list (e.g. new ValueVector[] { ... })
+ outgoingVectorInit.add(outgoingVectorInitBatch);
++batchId;
+ fieldId = 0;
}
+ // generate outgoing value vector 2d array initialization list.
+ cg.getSetupBlock().assign(outgoingVectors, outgoingVectorInit);
+
+ for (VectorWrapper<?> vvIn : incoming) {
+ // declare incoming value vectors
+ JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
+ fieldId,
+ vvIn.isHyper()));
+
+ // generate the copyFrom() invocation with explicit cast to the appropriate type
+ Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
+ vvIn.getField().getType().getMode());
+ JClass vvClass = cg.getModel().ref(vvType);
+ // the following block generates calls to copyFrom(); e.g.:
+ // ((IntVector) outgoingVectors[outIndex][0]).copyFrom(inIndex,
+ // outgoingBatches[outIndex].getRecordCount(),
+ // vv1);
+ cg.getBlock().add(
+ ((JExpression) JExpr.cast(vvClass,
+ ((JExpression)
+ outgoingVectors
+ .component(outIndex))
+ .component(JExpr.lit(fieldId))))
+ .invoke("copyFrom")
+ .arg(inIndex)
+ .arg(((JExpression) outgoingBatches.component(outIndex)).invoke("getRecordCount"))
+ .arg(incomingVV));
+
+ // generate the OutgoingRecordBatch helper invocations
+ cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("incRecordCount"));
+ cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("flushIfNecessary"));
+ ++fieldId;
+ }
try {
// compile and setup generated code
partitioner = context.getImplementationClassMultipleOutput(cg);
@@ -199,7 +236,7 @@ class PartitionSenderRootExec implements RootExec {
* @param isLastBatch true if this is the last incoming batch
* @param schemaChanged true if the schema has changed
*/
- public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) {
+ public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws SchemaChangeException {
for (OutgoingRecordBatch batch : outgoing) {
logger.debug("Attempting to flush all outgoing batches");
if (isLastBatch)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
index 4ecbd0e..48d7f5d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -20,17 +20,20 @@ package org.apache.drill.exec.work;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
/**
* Informs remote node as fragment changes state.
*/
-public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFragmentRunnerListener.class);
+public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
private final BitTunnel tunnel;
- public RemoteFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+ public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
super(context);
this.tunnel = tunnel;
}
@@ -38,7 +41,7 @@ public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- logger.debug("Sending remote status message. {}", status);
+ logger.debug("Sending status change message message to remote node: " + status);
tunnel.sendFragmentStatus(status);
}