You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/16 01:55:27 UTC
git commit: DRILL-299: OutgoingRecordBatch trying to get RecordCount
on incoming batch with outcome NOT_YET
Updated Branches:
refs/heads/master 0e830960f -> b07682084
DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b0768208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b0768208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b0768208
Branch: refs/heads/master
Commit: b07682084da96469e310028e67b365d005f99bdb
Parents: 0e83096
Author: Ben Becker <be...@gmail.com>
Authored: Fri Nov 15 16:35:12 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 15 16:35:12 2013 -0800
----------------------------------------------------------------------
.../partitionsender/OutgoingRecordBatch.java | 41 ++++----------------
.../physical/impl/TestHashToRandomExchange.java | 2 -
2 files changed, 8 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 2647ffc..081b4c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
@@ -50,7 +51,7 @@ import com.google.common.base.Preconditions;
* next() will never be called on this object. When a record batch is ready to send (e.g. nearing size
* limit or schema change), call flush() to send the batch.
*/
-public class OutgoingRecordBatch implements RecordBatch {
+public class OutgoingRecordBatch implements VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class);
private BitTunnel tunnel;
@@ -64,6 +65,8 @@ public class OutgoingRecordBatch implements RecordBatch {
private int recordCount;
private int recordCapacity;
private int oppositeMinorFragmentId;
+ private static int DEFAULT_ALLOC_SIZE = 20000;
+ private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) {
this.incoming = incoming;
@@ -71,7 +74,6 @@ public class OutgoingRecordBatch implements RecordBatch {
this.operator = operator;
this.tunnel = tunnel;
this.oppositeMinorFragmentId = oppositeMinorFragmentId;
- initializeBatch();
}
public void flushIfNecessary() {
@@ -137,8 +139,8 @@ public class OutgoingRecordBatch implements RecordBatch {
// must remain valid.
recordCount = 0;
for (VectorWrapper<?> v : vectorContainer) {
- logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
- VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
+ logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
+ VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
}
if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
return true;
@@ -150,7 +152,7 @@ public class OutgoingRecordBatch implements RecordBatch {
*/
public void initializeBatch() {
isLast = false;
- recordCapacity = incoming.getRecordCount();
+ recordCapacity = DEFAULT_ALLOC_SIZE;
vectorContainer = new VectorContainer();
SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -186,17 +188,6 @@ public class OutgoingRecordBatch implements RecordBatch {
}
@Override
- public IterOutcome next() {
- assert false;
- return IterOutcome.STOP;
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
public BatchSchema getSchema() {
Preconditions.checkNotNull(outSchema);
return outSchema;
@@ -208,21 +199,6 @@ public class OutgoingRecordBatch implements RecordBatch {
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return vectorContainer.getValueVectorId(path);
}
@@ -237,9 +213,8 @@ public class OutgoingRecordBatch implements RecordBatch {
return vectorContainer.iterator();
}
- @Override
public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
+ return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
index 0cc09b4..bbe1c18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
@@ -38,8 +38,6 @@ import com.google.common.io.Files;
public class TestHashToRandomExchange extends PopUnitTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashToRandomExchange.class);
- //Todo reenable this test once fix for has partition assignments is included
- @Ignore
@Test
public void twoBitTwoExchangeTwoEntryRun() throws Exception {
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();