You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/16 01:55:27 UTC

git commit: DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET

Updated Branches:
  refs/heads/master 0e830960f -> b07682084


DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b0768208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b0768208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b0768208

Branch: refs/heads/master
Commit: b07682084da96469e310028e67b365d005f99bdb
Parents: 0e83096
Author: Ben Becker <be...@gmail.com>
Authored: Fri Nov 15 16:35:12 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 15 16:35:12 2013 -0800

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    | 41 ++++----------------
 .../physical/impl/TestHashToRandomExchange.java |  2 -
 2 files changed, 8 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 2647ffc..081b4c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
@@ -50,7 +51,7 @@ import com.google.common.base.Preconditions;
  * next() will never be called on this object.  When a record batch is ready to send (e.g. nearing size
  * limit or schema change), call flush() to send the batch.
  */
-public class OutgoingRecordBatch implements RecordBatch {
+public class OutgoingRecordBatch implements VectorAccessible {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class);
 
   private BitTunnel tunnel;
@@ -64,6 +65,8 @@ public class OutgoingRecordBatch implements RecordBatch {
   private int recordCount;
   private int recordCapacity;
   private int oppositeMinorFragmentId;
+  private static int DEFAULT_ALLOC_SIZE = 20000;
+  private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
 
   public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) {
     this.incoming = incoming;
@@ -71,7 +74,6 @@ public class OutgoingRecordBatch implements RecordBatch {
     this.operator = operator;
     this.tunnel = tunnel;
     this.oppositeMinorFragmentId = oppositeMinorFragmentId;
-    initializeBatch();
   }
 
   public void flushIfNecessary() {
@@ -137,8 +139,8 @@ public class OutgoingRecordBatch implements RecordBatch {
     // must remain valid.
     recordCount = 0;
     for (VectorWrapper<?> v : vectorContainer) {
-      logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
-      VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
+      logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
+      VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
     return true;
@@ -150,7 +152,7 @@ public class OutgoingRecordBatch implements RecordBatch {
    */
   public void initializeBatch() {
     isLast = false;
-    recordCapacity = incoming.getRecordCount();
+    recordCapacity = DEFAULT_ALLOC_SIZE;
     vectorContainer = new VectorContainer();
 
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -186,17 +188,6 @@ public class OutgoingRecordBatch implements RecordBatch {
   }
 
   @Override
-  public IterOutcome next() {
-    assert false;
-    return IterOutcome.STOP;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
   public BatchSchema getSchema() {
     Preconditions.checkNotNull(outSchema);
     return outSchema;
@@ -208,21 +199,6 @@ public class OutgoingRecordBatch implements RecordBatch {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
     return vectorContainer.getValueVectorId(path);
   }
@@ -237,9 +213,8 @@ public class OutgoingRecordBatch implements RecordBatch {
     return vectorContainer.iterator();
   }
 
-  @Override
   public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
+    return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
index 0cc09b4..bbe1c18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
@@ -38,8 +38,6 @@ import com.google.common.io.Files;
 public class TestHashToRandomExchange extends PopUnitTestBase {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashToRandomExchange.class);
 
-  //Todo reenable this test once fix for has partition assignments is included
-  @Ignore
   @Test
   public void twoBitTwoExchangeTwoEntryRun() throws Exception {
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();