You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:45:03 UTC

[25/27] git commit: optimized generated code for partition sender

optimized generated code for partition sender


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ffc674b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ffc674b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ffc674b

Branch: refs/heads/master
Commit: 8ffc674b7ce699aa6905166bf2060bc4e48e45c3
Parents: 4a10ea1
Author: Ben Becker <be...@gmail.com>
Authored: Sat Aug 10 01:20:53 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    |  52 ++++-----
 .../PartitionSenderRootExec.java                | 115 ++++++++++++-------
 .../exec/work/RemoteFragmentRunnerListener.java |  11 +-
 3 files changed, 107 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 4ab598c..b40ce4c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,11 +19,10 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.util.Iterator;
-import java.util.List;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.proto.ExecProtos;
@@ -52,7 +51,6 @@ public class OutgoingRecordBatch implements RecordBatch {
   private RecordBatch incoming;
   private FragmentContext context;
   private BatchSchema outSchema;
-  private List<ValueVector> valueVectors;
   private VectorContainer vectorContainer;
   private int recordCount;
   private int recordCapacity;
@@ -65,29 +63,25 @@ public class OutgoingRecordBatch implements RecordBatch {
     initializeBatch();
   }
 
-  public OutgoingRecordBatch() {  }
-
-  public void init(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context) {
-    this.incoming = incoming;
-    this.context = context;
-    this.operator = operator;
-    this.tunnel = tunnel;
-    resetBatch();
-  }
-
   public void flushIfNecessary() {
-    if (recordCount == recordCapacity - 1) flush();
+    if (recordCount == recordCapacity) logger.debug("Flush is necesary:  Count is " + recordCount + ", capacity is " + recordCapacity);
+    try {
+      if (recordCount == recordCapacity) flush();
+    } catch (SchemaChangeException e) {
+      // TODO:
+      logger.error("Unable to flush outgoing record batch: " + e);
+    }
   }
 
   public void incRecordCount() {
     ++recordCount;
   }
   
-  public void flush() {
+  public void flush() throws SchemaChangeException {
     if (recordCount == 0) {
       logger.warn("Attempted to flush an empty record batch");
-      return;
     }
+    logger.debug("Flushing record batch.  count is: " + recordCount + ", capacity is " + recordCapacity);
     final ExecProtos.FragmentHandle handle = context.getHandle();
     FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
                                                                     handle.getQueryId(),
@@ -96,23 +90,26 @@ public class OutgoingRecordBatch implements RecordBatch {
                                                                     operator.getOppositeMajorFragmentId(),
                                                                     0,
                                                                     getWritableBatch());
-    tunnel.sendRecordBatch(statusHandler, context, writableBatch);
+     tunnel.sendRecordBatch(statusHandler, context, writableBatch);
 
     // reset values and reallocate the buffer for each value vector.  NOTE: the value vector is directly
     // referenced by generated code and must not be replaced.
     recordCount = 0;
     for (VectorWrapper v : vectorContainer) {
-      getAllocator(TypeHelper.getNewVector(v.getField(), context.getAllocator()),
-                   v.getValueVector()).alloc(recordCapacity);
+      logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector());
+      getAllocator(v.getValueVector(),
+                   TypeHelper.getNewVector(v.getField(), context.getAllocator())).alloc(recordCapacity);
     }
+    if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
   }
 
+
   /**
    * Create a new output schema and allocate space for value vectors based on the incoming record batch.
    */
   public void initializeBatch() {
+    isLast = false;
     recordCapacity = incoming.getRecordCount();
-    valueVectors = Lists.newArrayList();
     vectorContainer = new VectorContainer();
 
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -122,12 +119,13 @@ public class OutgoingRecordBatch implements RecordBatch {
       bldr.addField(v.getField());
 
       // allocate a new value vector
-      vectorContainer.add(v.getValueVector());
       ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
-      getAllocator(outgoingVector, v.getValueVector()).alloc(recordCapacity);
-      valueVectors.add(outgoingVector);
+      getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
+      vectorContainer.add(outgoingVector);
+      logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
     }
     outSchema = bldr.build();
+    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
   }
 
   /**
@@ -135,13 +133,11 @@ public class OutgoingRecordBatch implements RecordBatch {
    * on the incoming record batch.
    */
   public void resetBatch() {
+    isLast = false;
     recordCount = 0;
     recordCapacity = 0;
-    if (valueVectors != null) {
-      for(ValueVector v : valueVectors){
-        v.close();
-      }
-    }
+    for (VectorWrapper v : vectorContainer)
+      v.getValueVector().clear();
     initializeBatch();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 313ddf3..aa25c96 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.physical.impl.partitionsender;
 
-import com.beust.jcommander.internal.Lists;
 import com.sun.codemodel.*;
 import org.apache.drill.common.expression.*;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -32,9 +31,10 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import java.io.IOException;
-import java.util.List;
 
 class PartitionSenderRootExec implements RootExec {
 
@@ -56,10 +56,10 @@ class PartitionSenderRootExec implements RootExec {
     this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
     int fieldId = 0;
     for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations())
-      outgoing[fieldId] = new OutgoingRecordBatch(operator,
-                             context.getCommunicator().getTunnel(endpoint),
-                             incoming,
-                             context);
+      outgoing[fieldId++] = new OutgoingRecordBatch(operator,
+                                                    context.getCommunicator().getTunnel(endpoint),
+                                                    incoming,
+                                                    context);
     try {
       createPartitioner();
     } catch (SchemaChangeException e) {
@@ -87,19 +87,25 @@ class PartitionSenderRootExec implements RootExec {
           partitioner.partitionBatch(incoming);
 
         // send all pending batches
-        flushOutgoingBatches(true, false);
+        try {
+          flushOutgoingBatches(true, false);
+        } catch (SchemaChangeException e) {
+          incoming.kill();
+          logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
+          context.fail(e);
+          return false;
+        }
         return false;
 
       case OK_NEW_SCHEMA:
-        // send all existing batches
-        flushOutgoingBatches(false, true);
-        // update OutgoingRecordBatch's schema and value vectors
         try {
+          // send all existing batches
+          flushOutgoingBatches(false, true);
+          // update OutgoingRecordBatch's schema and generate partitioning code
           createPartitioner();
-          partitioner.setup(context, incoming, outgoing);
         } catch (SchemaChangeException e) {
           incoming.kill();
-          logger.error("Failed to create partitioning sender during query ", e);
+          logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
           context.fail(e);
           return false;
         }
@@ -133,54 +139,85 @@ class PartitionSenderRootExec implements RootExec {
     }
 
     // generate code to copy from an incoming value vector to the destination partition's outgoing value vector
-    int fieldId = 0;
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
+    JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array();
+    JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array();
     cg.rotateBlock();
 
-    // declare array of record batches for each partition
+    // declare and assign the array of outgoing record batches
     JVar outgoingBatches = cg.clazz.field(JMod.NONE,
-                                          cg.getModel().ref(OutgoingRecordBatch.class).array(),
+                                          outgoingBatchArrayType,
                                           "outgoingBatches");
-
     cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing"));
 
-    // declare incoming value vectors
-    List<JVar> incomingVVs = Lists.newArrayList();
-    for (VectorWrapper<?> vvIn : incoming)
-      incomingVVs.add(cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
-                                                                                       fieldId++,
-                                                                                       vvIn.isHyper())));
+    // declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second
+    JVar outgoingVectors = cg.clazz.field(JMod.NONE,
+                                          outgoingVectorArrayType,
+                                          "outgoingVectors");
 
+    // create 2d array and build initialization list.  For example:
+    //     outgoingVectors = new ValueVector[][] { 
+    //                              new ValueVector[] {vv1, vv2},
+    //                              new ValueVector[] {vv3, vv4}
+    //                       });
+    JArray outgoingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
+
+    int fieldId = 0;
     int batchId = 0;
-    fieldId = 0;
-    // generate switch statement for each destination batch
-    JSwitch switchStatement = cg.getBlock()._switch(outIndex);
     for (OutgoingRecordBatch batch : outgoing) {
 
-      // generate case statement for this batch
-      JBlock caseBlock = switchStatement._case(JExpr.lit(batchId)).body();
-
+      JArray outgoingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class));
       for (VectorWrapper<?> vv : batch) {
-        // declare outgoing value vector and a corresponding counter
+        // declare outgoing value vector and assign it to the array
         JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]",
                                                          new TypedFieldId(vv.getField().getType(),
                                                                           fieldId,
                                                                           false));
-
-        caseBlock.add(outVV.invoke("copyFrom")
-                              .arg(inIndex)
-                              .arg(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("getRecordCount"))
-                              .arg(incomingVVs.get(fieldId)));
+        // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
+        outgoingVectorInitBatch.add(outVV);
         ++fieldId;
       }
-      caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("incRecordCount"));
-      caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("flushIfNecessary"));
-      fieldId = 0;
-      caseBlock._break();
+
+      // add VV array to initialization list (e.g. new ValueVector[] { ... })
+      outgoingVectorInit.add(outgoingVectorInitBatch);
       ++batchId;
+      fieldId = 0;
     }
 
+    // generate outgoing value vector 2d array initialization list.
+    cg.getSetupBlock().assign(outgoingVectors, outgoingVectorInit);
+
+    for (VectorWrapper<?> vvIn : incoming) {
+      // declare incoming value vectors
+      JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
+                                                                                         fieldId,
+                                                                                         vvIn.isHyper()));
+
+      // generate the copyFrom() invocation with explicit cast to the appropriate type
+      Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
+                                                       vvIn.getField().getType().getMode());
+      JClass vvClass = cg.getModel().ref(vvType);
+      // the following block generates calls to copyFrom(); e.g.:
+      // ((IntVector) outgoingVectors[outIndex][0]).copyFrom(inIndex,
+      //                                                     outgoingBatches[outIndex].getRecordCount(),
+      //                                                     vv1);
+      cg.getBlock().add(
+        ((JExpression) JExpr.cast(vvClass,
+              ((JExpression)
+                     outgoingVectors
+                       .component(outIndex))
+                       .component(JExpr.lit(fieldId))))
+                       .invoke("copyFrom")
+                       .arg(inIndex)
+                       .arg(((JExpression) outgoingBatches.component(outIndex)).invoke("getRecordCount"))
+                       .arg(incomingVV));
+
+      // generate the OutgoingRecordBatch helper invocations
+      cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("incRecordCount"));
+      cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("flushIfNecessary"));
+      ++fieldId;
+    }
     try {
       // compile and setup generated code
       partitioner = context.getImplementationClassMultipleOutput(cg);
@@ -199,7 +236,7 @@ class PartitionSenderRootExec implements RootExec {
    * @param isLastBatch    true if this is the last incoming batch
    * @param schemaChanged  true if the schema has changed
    */
-  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) {
+  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws SchemaChangeException {
     for (OutgoingRecordBatch batch : outgoing) {
       logger.debug("Attempting to flush all outgoing batches");
       if (isLastBatch)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
index 4ecbd0e..48d7f5d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -20,17 +20,20 @@ package org.apache.drill.exec.work;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
 import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
 
 /**
  * Informs remote node as fragment changes state.
  */
-public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFragmentRunnerListener.class);
+public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
   
   private final BitTunnel tunnel;
 
-  public RemoteFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+  public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
     super(context);
     this.tunnel = tunnel;
   }
@@ -38,7 +41,7 @@ public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener
   
   @Override
   protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-    logger.debug("Sending remote status message. {}", status);
+    logger.debug("Sending status change message message to remote node: " + status);
     tunnel.sendFragmentStatus(status);
   }