You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/01/23 02:56:34 UTC
[2/3] drill git commit: DRILL-1960: Automatic reallocation
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 2fd5ce1..4b8e357 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -17,12 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.aggregate;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.inject.Named;
+
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -46,19 +54,19 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ObjectVector;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
-import javax.inject.Named;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.collect.Lists;
public abstract class HashAggTemplate implements HashAggregator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+ private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+ private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+
private static final boolean EXTRA_DEBUG_1 = false;
private static final boolean EXTRA_DEBUG_2 = false;
private static final String TOO_BIG_ERROR =
@@ -112,6 +120,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
+
public class BatchHolder {
private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
@@ -144,7 +153,7 @@ public abstract class HashAggTemplate implements HashAggregator {
((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
HashTable.BATCH_SIZE);
} else if (vector instanceof ObjectVector) {
- ((ObjectVector)vector).allocateNew(HashTable.BATCH_SIZE);
+ ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
} else {
vector.allocateNew();
}
@@ -165,33 +174,17 @@ public abstract class HashAggTemplate implements HashAggregator {
setupInterior(incoming, outgoing, aggrValuesContainer);
}
- private boolean outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
+ private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
outStartIdxHolder.value = batchOutputCount;
outNumRecordsHolder.value = 0;
- boolean status = true;
-
- // Output records starting from 'batchOutputCount' in current batch until there are no more records
- // or output vectors have no space left. In destination vectors, start filling records from 0th position.
- while(batchOutputCount <= maxOccupiedIdx) {
- if (outputRecordValues(batchOutputCount, outNumRecordsHolder.value)) {
- if (EXTRA_DEBUG_2) {
- logger.debug("Outputting values from input index {} to output index: {}",
- batchOutputCount, outNumRecordsHolder.value);
- }
- batchOutputCount++;
- outNumRecordsHolder.value++;
- } else {
- status = false;
- break;
+ for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
+ outputRecordValues(i, batchOutputCount);
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Outputting values to output index: {}", batchOutputCount);
}
+ batchOutputCount++;
+ outNumRecordsHolder.value++;
}
- // It's not a failure if only some records were output (at least 1) .. since out-of-memory
- // conditions may prevent all records from being output; the caller has the responsibility to
- // allocate more memory and continue outputting more records
- if (!status && outNumRecordsHolder.value > 0) {
- status = true;
- }
- return status;
}
private void clear() {
@@ -218,8 +211,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@RuntimeOverridden
- public boolean outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
- return true;
+ public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
}
}
@@ -303,8 +295,7 @@ public abstract class HashAggTemplate implements HashAggregator {
if (EXTRA_DEBUG_2) {
logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
}
- boolean success = checkGroupAndAggrValues(currentIndex);
- assert success : "HashAgg couldn't copy values.";
+ checkGroupAndAggrValues(currentIndex);
}
if (EXTRA_DEBUG_1) {
@@ -323,57 +314,56 @@ public abstract class HashAggTemplate implements HashAggregator {
logger.debug("Received IterOutcome of {}", out);
}
switch (out) {
- case NOT_YET:
- this.outcome = out;
- return AggOutcome.RETURN_OUTCOME;
-
- case OK_NEW_SCHEMA:
- if (EXTRA_DEBUG_1) {
- logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- }
- newSchema = true;
- this.cleanup();
- // TODO: new schema case needs to be handled appropriately
- return AggOutcome.UPDATE_AGGREGATOR;
-
- case OK:
- resetIndex();
- if (incoming.getRecordCount() == 0) {
- continue;
- } else {
- boolean success = checkGroupAndAggrValues(currentIndex);
- assert success : "HashAgg couldn't copy values.";
- incIndex();
+ case NOT_YET:
+ this.outcome = out;
+ return AggOutcome.RETURN_OUTCOME;
+ case OK_NEW_SCHEMA:
if (EXTRA_DEBUG_1) {
- logger.debug("Continuing outside loop");
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
+ newSchema = true;
+ this.cleanup();
+ // TODO: new schema case needs to be handled appropriately
+ return AggOutcome.UPDATE_AGGREGATOR;
+
+ case OK:
+ resetIndex();
+ if (incoming.getRecordCount() == 0) {
+ continue;
+ } else {
+ checkGroupAndAggrValues(currentIndex);
+ incIndex();
+
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Continuing outside loop");
+ }
+ continue outside;
}
- continue outside;
- }
- case NONE:
- // outcome = out;
+ case NONE:
+ // outcome = out;
- buildComplete = true;
+ buildComplete = true;
- updateStats(htable);
+ updateStats(htable);
- // output the first batch; remaining batches will be output
- // in response to each next() call by a downstream operator
+ // output the first batch; remaining batches will be output
+ // in response to each next() call by a downstream operator
- outputCurrentBatch();
+ outputCurrentBatch();
- // cleanup incoming batch since output of aggregation does not need
- // any references to the incoming
+ // cleanup incoming batch since output of aggregation does not need
+ // any references to the incoming
- incoming.cleanup();
- // return setOkAndReturn();
- return AggOutcome.RETURN_OUTCOME;
+ incoming.cleanup();
+ // return setOkAndReturn();
+ return AggOutcome.RETURN_OUTCOME;
- case STOP:
- default:
- outcome = out;
- return AggOutcome.CLEANUP_AND_RETURN;
+ case STOP:
+ default:
+ outcome = out;
+ return AggOutcome.CLEANUP_AND_RETURN;
}
}
@@ -388,7 +378,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- private void allocateOutgoing() {
+ private void allocateOutgoing(int records) {
// Skip the keys and only allocate for outputting the workspace values
// (keys will be output through splitAndTransfer)
Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
@@ -397,7 +387,12 @@ public abstract class HashAggTemplate implements HashAggregator {
}
while (outgoingIter.hasNext()) {
ValueVector vv = outgoingIter.next().getValueVector();
- vv.allocateNew();
+ MajorType type = vv.getField().getType();
+ if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+ vv.allocateNew();
+ } else {
+ AllocationHelper.allocate(vv, records, 1);
+ }
}
}
@@ -483,72 +478,40 @@ public abstract class HashAggTemplate implements HashAggregator {
return outcome;
}
- allocateOutgoing();
+ allocateOutgoing(numPendingOutput);
- boolean outputKeysStatus = true;
- boolean outputValuesStatus = true;
-
- outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+ batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
int numOutputRecords = outNumRecordsHolder.value;
if (EXTRA_DEBUG_1) {
- logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value,
- outNumRecordsHolder.value);
- }
- if (outputValuesStatus) {
- outputKeysStatus =
- this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+ logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
}
+ this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
- if (outputKeysStatus && outputValuesStatus) {
-
- // set the value count for outgoing batch value vectors
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(numOutputRecords);
- }
+ // set the value count for outgoing batch value vectors
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(numOutputRecords);
+ }
- outputCount += numOutputRecords;
+ outputCount += numOutputRecords;
- if (first) {
- this.outcome = IterOutcome.OK_NEW_SCHEMA;
- } else {
- this.outcome = IterOutcome.OK;
- }
+ if (first) {
+ this.outcome = IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ this.outcome = IterOutcome.OK;
+ }
- logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+ logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
- lastBatchOutputCount = numOutputRecords;
- // If there are no more records to output, go to the next batch. If there are any records left refer to the
- // same BatchHolder. Remaining records will be outputted in next outputCurrentBatch() call(s).
- if (batchHolders.get(outBatchIndex).getNumPendingOutput() == 0) {
- outBatchIndex++;
- }
- if (outBatchIndex == batchHolders.size()) {
- allFlushed = true;
+ lastBatchOutputCount = numOutputRecords;
+ outBatchIndex++;
+ if (outBatchIndex == batchHolders.size()) {
+ allFlushed = true;
- logger.debug("HashAggregate: All batches flushed.");
+ logger.debug("HashAggregate: All batches flushed.");
- // cleanup my internal state since there is nothing more to return
- this.cleanup();
- }
- } else {
- if (!outputKeysStatus) {
- logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);
- for (VectorWrapper<?> v : outContainer) {
- logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
- v.getValueVector().getValueCapacity());
- }
- context.fail(new Exception("Failed to output keys for current batch !"));
- }
- if (!outputValuesStatus) {
- logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
- for (VectorWrapper<?> v : outContainer) {
- logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
- v.getValueVector().getValueCapacity());
- }
- context.fail(new Exception("Failed to output values for current batch !"));
- }
- this.outcome = IterOutcome.STOP;
+ // cleanup my internal state since there is nothing more to return
+ this.cleanup();
}
return this.outcome;
@@ -569,72 +532,45 @@ public abstract class HashAggTemplate implements HashAggregator {
// Check if a group is present in the hash table; if not, insert it in the hash table.
// The htIdxHolder contains the index of the group in the hash table container; this same
// index is also used for the aggregation values maintained by the hash aggregate.
- private boolean checkGroupAndAggrValues(int incomingRowIdx) {
+ private void checkGroupAndAggrValues(int incomingRowIdx) {
if (incomingRowIdx < 0) {
throw new IllegalArgumentException("Invalid incoming row index.");
}
/** for debugging
- Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
- BigIntVector vv0 = null;
- BigIntHolder holder = null;
+ Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
+ BigIntVector vv0 = null;
+ BigIntHolder holder = null;
- if (tmp != null) {
- vv0 = ((BigIntVector) tmp);
- holder = new BigIntHolder();
- holder.value = vv0.getAccessor().get(incomingRowIdx) ;
- }
- */
-
- HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
-
- if (putStatus != HashTable.PutStatus.PUT_FAILED) {
- int currentIdx = htIdxHolder.value;
-
- // get the batch index and index within the batch
- if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
- addBatchHolder();
- }
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
- int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
-
- // Check if we have almost filled up the workspace vectors and add a batch if necessary
- if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
- htable.addNewKeyBatch();
- addBatchHolder();
- bh.allocatedNextBatch = true;
- }
+ if (tmp != null) {
+ vv0 = ((BigIntVector) tmp);
+ holder = new BigIntHolder();
+ holder.value = vv0.getAccessor().get(incomingRowIdx) ;
+ }
+ */
+ htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
- if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
- if (EXTRA_DEBUG_2) {
- logger.debug("Group-by key already present in hash table, updating the aggregate values");
- }
+ int currentIdx = htIdxHolder.value;
- // debugging
- //if (holder.value == 100018 || holder.value == 100021) {
- // logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ;
- //}
+ // get the batch index and index within the batch
+ if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
+ addBatchHolder();
+ }
+ BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+ int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
- } else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
- if (EXTRA_DEBUG_2) {
- logger.debug("Group-by key was added to hash table, inserting new aggregate values");
- }
+ // Check if we have almost filled up the workspace vectors and add a batch if necessary
+ if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
+ htable.addNewKeyBatch();
+ addBatchHolder();
+ bh.allocatedNextBatch = true;
+ }
- // debugging
- // if (holder.value == 100018 || holder.value == 100021) {
- // logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ;
- //}
- }
- if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
- numGroupedRecords++;
- return true;
- }
+ if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
+ numGroupedRecords++;
}
-
- logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = {}.", incomingRowIdx, htable.size());
- return false;
}
private void updateStats(HashTable htable) {
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 0f7f394..7cc43ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.aggregate;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -31,9 +34,6 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
-import java.io.IOException;
-import java.util.List;
-
public interface HashAggregator {
public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ef85a36..860627d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -284,9 +284,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
cg.setMappingSet(EVAL);
for (LogicalExpression ex : valueExprs) {
HoldingContainer hc = cg.addExpr(ex);
- cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
- cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
@@ -295,9 +293,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
cg.setMappingSet(RECORD_KEYS);
for (int i =0; i < keyExprs.length; i++) {
HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true));
- cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
- cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
private final GeneratorMapping PREVIOUS_KEYS_OUT = GeneratorMapping.create("setupInterior", "outputRecordKeysPrev", null, null);
@@ -317,10 +313,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
HoldingContainer innerExpression = cg.addExpr(keyExprs[i], false);
cg.setMappingSet(RECORD_KEYS_PREV_OUT);
HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false);
- cg.getBlock(BlockType.EVAL)._if(outerExpression.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
- cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
private void getIndex(ClassGenerator<StreamingAggregator> g) {
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 556b260..14e6aff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -120,9 +120,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
// pick up a remainder batch if we have one.
if (remainderBatch != null) {
- if (!outputToBatch( previousIndex )) {
- return tooBigFailure();
- }
+ outputToBatch( previousIndex );
remainderBatch.clear();
remainderBatch = null;
return setOkAndReturn();
@@ -136,9 +134,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (EXTRA_DEBUG) {
logger.debug("Attempting to output remainder.");
}
- if (!outputToBatch( previousIndex)) {
- return tooBigFailure();
- }
+ outputToBatch( previousIndex);
}
if (newSchema) {
@@ -171,26 +167,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (EXTRA_DEBUG) {
logger.debug("Values were different, outputting previous batch.");
}
- if (outputToBatch(previousIndex)) {
- if (EXTRA_DEBUG) {
- logger.debug("Output successful.");
- }
- addRecordInc(currentIndex);
- } else {
- if (EXTRA_DEBUG) {
- logger.debug("Output failed.");
- }
- if (outputCount == 0) {
- return tooBigFailure();
- }
-
- // mark the pending output but move forward for the next cycle.
- pendingOutput = true;
- previousIndex = currentIndex;
- incIndex();
- return setOkAndReturn();
-
+ outputToBatch(previousIndex);
+ if (EXTRA_DEBUG) {
+ logger.debug("Output successful.");
}
+ addRecordInc(currentIndex);
}
previousIndex = currentIndex;
}
@@ -215,9 +196,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (first && addedRecordCount == 0) {
return setOkAndReturn();
} else if(addedRecordCount > 0) {
- if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
- remainderBatch = previous;
- }
+ outputToBatchPrev( previous, previousIndex, outputCount);
if (EXTRA_DEBUG) {
logger.debug("Received no more batches, returning.");
}
@@ -239,9 +218,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
}
if (addedRecordCount > 0) {
- if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
- remainderBatch = previous;
- }
+ outputToBatchPrev( previous, previousIndex, outputCount);
if (EXTRA_DEBUG) {
logger.debug("Wrote out end of previous batch, returning.");
}
@@ -272,10 +249,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
previousIndex = currentIndex;
if (addedRecordCount > 0) {
- if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
- remainderBatch = previous;
- return setOkAndReturn();
- }
+ outputToBatchPrev( previous, previousIndex, outputCount);
continue outside;
}
}
@@ -329,21 +303,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.RETURN_OUTCOME;
}
- private final boolean outputToBatch(int inIndex) {
+ private final void outputToBatch(int inIndex) {
- if (!outputRecordKeys(inIndex, outputCount)) {
- if(EXTRA_DEBUG) {
- logger.debug("Failure while outputting keys {}", outputCount);
- }
- return false;
- }
+ outputRecordKeys(inIndex, outputCount);
- if (!outputRecordValues(outputCount)) {
- if (EXTRA_DEBUG) {
- logger.debug("Failure while outputting values {}", outputCount);
- }
- return false;
- }
+ outputRecordValues(outputCount);
if (EXTRA_DEBUG) {
logger.debug("{} values output successfully", outputCount);
@@ -351,20 +315,15 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
resetValues();
outputCount++;
addedRecordCount = 0;
- return true;
}
- private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
- boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
- && outputRecordValues(outIndex) //
- && resetValues();
- if (success) {
- resetValues();
- outputCount++;
- addedRecordCount = 0;
- }
-
- return success;
+ private final void outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+ outputRecordKeysPrev(b1, inIndex, outIndex);
+ outputRecordValues(outIndex);
+ resetValues();
+ resetValues();
+ outputCount++;
+ addedRecordCount = 0;
}
private void addRecordInc(int index) {
@@ -383,9 +342,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
public abstract void addRecord(@Named("index") int index);
- public abstract boolean outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
- public abstract boolean outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
- public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+ public abstract void outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract void outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
+ public abstract void outputRecordValues(@Named("outIndex") int outIndex);
public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
public abstract boolean resetValues();
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index b5cfdca..fd6a3e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -17,13 +17,19 @@
*/
package org.apache.drill.exec.physical.impl.common;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
@@ -45,11 +51,11 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.resolver.TypeCastRules;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
public class ChainedHashTable {
@@ -64,8 +70,8 @@ public class ChainedHashTable {
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_BUILD =
- GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */,
- null /* cleanup */);
+ GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
+ null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_PROBE =
GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */,
@@ -119,7 +125,7 @@ public class ChainedHashTable {
private final boolean areNullsEqual;
public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
- RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, boolean areNullsEqual) {
+ RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, boolean areNullsEqual) {
this.htConfig = htConfig;
this.context = context;
@@ -150,8 +156,7 @@ public class ChainedHashTable {
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
- final LogicalExpression expr =
- ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
@@ -171,9 +176,7 @@ public class ChainedHashTable {
if (isProbe) {
i = 0;
for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
- final LogicalExpression expr =
- ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector,
- context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
@@ -218,9 +221,9 @@ public class ChainedHashTable {
}
- private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping,
- MappingSet htableMapping, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws
- SchemaChangeException {
+ private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
+ LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds)
+ throws SchemaChangeException {
cg.setMappingSet(incomingMapping);
if (keyExprs == null || keyExprs.length == 0) {
@@ -261,37 +264,31 @@ public class ChainedHashTable {
}
private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs,
- TypedFieldId[] htKeyFieldIds) throws SchemaChangeException {
+ TypedFieldId[] htKeyFieldIds) throws SchemaChangeException {
cg.setMappingSet(SetValueMapping);
int i = 0;
for (LogicalExpression expr : keyExprs) {
- ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true);
+ boolean useSetSafe = !Types.isFixedWidthType(expr.getMajorType()) || Types.isRepeated(expr.getMajorType());
+ ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, useSetSafe);
- HoldingContainer hc = cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx
}
-
- cg.getEvalBlock()._return(JExpr.TRUE);
}
- private void setupOutputRecordKeys(ClassGenerator<HashTable> cg, TypedFieldId[] htKeyFieldIds,
- TypedFieldId[] outKeyFieldIds) {
+ private void setupOutputRecordKeys(ClassGenerator<HashTable> cg, TypedFieldId[] htKeyFieldIds, TypedFieldId[] outKeyFieldIds) {
cg.setMappingSet(OutputRecordKeysMapping);
if (outKeyFieldIds != null) {
for (int i = 0; i < outKeyFieldIds.length; i++) {
ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
- ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
- HoldingContainer hc = cg.addExpr(vvwExpr);
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ boolean useSetSafe = !Types.isFixedWidthType(vvrExpr.getMajorType()) || Types.isRepeated(vvrExpr.getMajorType());
+ ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, useSetSafe);
+ cg.addExpr(vvwExpr);
}
- cg.getEvalBlock()._return(JExpr.TRUE);
- } else {
- cg.getEvalBlock()._return(JExpr.FALSE);
}
}
@@ -320,22 +317,18 @@ public class ChainedHashTable {
if (result == null) {
throw new DrillRuntimeException(String.format("Join conditions cannot be compared failing build " +
- "expression:" + " %s failing probe expression: %s", buildExpr.getMajorType().toString(),
+ "expression:" + " %s failing probe expression: %s", buildExpr.getMajorType().toString(),
probeExpr.getMajorType().toString()));
} else if (result != buildType) {
// Add a cast expression on top of the build expression
- LogicalExpression castExpr =
- ExpressionTreeMaterializer.addCastExpression(buildExpr, probeExpr.getMajorType(),
- context.getFunctionRegistry(), errorCollector);
+ LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(buildExpr, probeExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
// Store the newly casted expression
keyExprsBuild[i] =
ExpressionTreeMaterializer.materialize(castExpr, incomingBuild, errorCollector,
context.getFunctionRegistry());
} else if (result != probeType) {
// Add a cast expression on top of the probe expression
- LogicalExpression castExpr =
- ExpressionTreeMaterializer.addCastExpression(probeExpr, buildExpr.getMajorType(),
- context.getFunctionRegistry(), errorCollector);
+ LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(probeExpr, buildExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
// store the newly casted expression
keyExprsProbe[i] =
ExpressionTreeMaterializer.materialize(castExpr, incomingProbe, errorCollector,
@@ -346,7 +339,7 @@ public class ChainedHashTable {
}
private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, LogicalExpression[] keyExprs,
- boolean isProbe) throws SchemaChangeException {
+ boolean isProbe) throws SchemaChangeException {
cg.setMappingSet(incomingMapping);
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1ec74bf..ef7dadf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -60,7 +60,7 @@ public interface HashTable {
public void updateBatches();
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
+ public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
public int containsKey(int incomingRowIdx, boolean isProbe);
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index ba980d7..0908e50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -17,7 +17,13 @@
*/
package org.apache.drill.exec.physical.impl.common;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.inject.Named;
+
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -29,16 +35,13 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
-import javax.inject.Named;
-import java.util.ArrayList;
-import java.util.Iterator;
-
public abstract class HashTableTemplate implements HashTable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
@@ -116,6 +119,7 @@ public abstract class HashTableTemplate implements HashTable {
private int batchIndex = 0;
private BatchHolder(int idx) {
+
this.batchIndex = idx;
htContainer = new VectorContainer();
@@ -188,13 +192,10 @@ public abstract class HashTableTemplate implements HashTable {
// Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
// container at the specified index
- private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch,
- int lastEntryIdxWithinBatch) {
+ private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
int currentIdxWithinBatch = currentIdx & BATCH_MASK;
- if (!setValue(incomingRowIdx, currentIdxWithinBatch)) {
- return false;
- }
+ setValue(incomingRowIdx, currentIdxWithinBatch);
// the previous entry in this hash chain should now point to the entry in this currentIdx
if (lastEntryBatch != null) {
@@ -212,8 +213,6 @@ public abstract class HashTableTemplate implements HashTable {
logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.",
incomingRowIdx, currentIdx, hashValue);
}
-
- return true;
}
private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) {
@@ -222,8 +221,7 @@ public abstract class HashTableTemplate implements HashTable {
private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) {
- logger.debug("Rehashing entries within the batch: {}; batchStartIdx = {}, total numBuckets in hash table = {}" +
- ".", batchIndex, batchStartIdx, numbuckets);
+ logger.debug("Rehashing entries within the batch: {}; batchStartIdx = {}, total numBuckets in hash table = {}.", batchIndex, batchStartIdx, numbuckets);
int size = links.getAccessor().getValueCount();
IntVector newLinks = allocMetadataVector(size, EMPTY_SLOT);
@@ -411,14 +409,13 @@ public abstract class HashTableTemplate implements HashTable {
}
@RuntimeOverridden
- protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
- return false;
+ protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
}
@RuntimeOverridden
- protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
- return false;
+ protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
}
+
} // class BatchHolder
@@ -530,25 +527,13 @@ public abstract class HashTableTemplate implements HashTable {
return rounded;
}
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
- HashTable.PutStatus putStatus = put(incomingRowIdx, htIdxHolder);
- int count = retryCount;
- int numBatchHolders;
- while (putStatus == PutStatus.PUT_FAILED && count > 0) {
- logger.debug("Put into hash table failed .. Retrying with new batch holder...");
- numBatchHolders = batchHolders.size();
- this.addBatchHolder();
- freeIndex = numBatchHolders * BATCH_SIZE;
- putStatus = put(incomingRowIdx, htIdxHolder);
- count--;
- }
- return putStatus;
+ public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
+ put(incomingRowIdx, htIdxHolder);
}
private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
int hash = getHashBuild(incomingRowIdx);
- hash = Math.abs(hash);
int i = getBucketIndex(hash, numBuckets());
int startIdx = startIndices.getAccessor().get(i);
int currentIdx;
@@ -569,14 +554,11 @@ public abstract class HashTableTemplate implements HashTable {
incomingRowIdx, currentIdx);
}
- if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
- // update the start index array
- boolean status = startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
- assert status : "Unable to set start indices in the hash table.";
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
- }
- return PutStatus.PUT_FAILED;
+ insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
+ // update the start index array
+ startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
+ htIdxHolder.value = currentIdx;
+ return PutStatus.KEY_ADDED;
}
currentIdx = startIdx;
@@ -610,49 +592,38 @@ public abstract class HashTableTemplate implements HashTable {
addBatchIfNeeded(currentIdx);
if (EXTRA_DEBUG) {
- logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.",
- incomingRowIdx, currentIdx);
+ logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
}
- if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
- } else {
- return PutStatus.PUT_FAILED;
- }
+ insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
+ htIdxHolder.value = currentIdx;
+ return PutStatus.KEY_ADDED;
}
return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
}
- private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch,
- int lastEntryIdx) {
+ private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
addBatchIfNeeded(currentIdx);
BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
- if (bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx)) {
- numEntries++;
+ bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx);
+ numEntries++;
/* Resize hash table if needed and transfer the metadata
* Resize only after inserting the current entry into the hash table
* Otherwise our calculated lastEntryBatch and lastEntryIdx
* becomes invalid after resize.
*/
- resizeAndRehashIfNeeded();
-
- return true;
- }
-
- return false;
+ resizeAndRehashIfNeeded();
}
// Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
@Override
public int containsKey(int incomingRowIdx, boolean isProbe) {
int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
- hash = Math.abs(hash);
int i = getBucketIndex(hash, numBuckets());
int currentIdx = startIndices.getAccessor().get(i);
@@ -764,7 +735,6 @@ public abstract class HashTableTemplate implements HashTable {
public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
assert batchIdx < batchHolders.size();
-
if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords)) {
return false;
}
@@ -788,8 +758,7 @@ public abstract class HashTableTemplate implements HashTable {
}
// These methods will be code-generated in the context of the outer class
- protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild,
- @Named("incomingProbe") RecordBatch incomingProbe);
+ protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx);
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index e82dd29..02c154f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -351,7 +351,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
logger.debug("Added eval for project expression.");
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f370dc7..4af0292 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,9 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -56,10 +57,15 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.eigenbase.rel.JoinRelType;
-import java.io.IOException;
-import java.util.List;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+
// Probe side record batch
private final RecordBatch left;
@@ -107,18 +113,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, null /* reset */,
null /* cleanup */);
// Generator mapping for the build side : constant
- private static final GeneratorMapping PROJECT_BUILD_CONSTANT =
- GeneratorMapping.create("doSetup"/* setup method */, "doSetup" /* eval method */, null /* reset */,
- null /* cleanup */);
+ private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */,
+ "doSetup" /* eval method */,
+ null /* reset */, null /* cleanup */);
// Generator mapping for the probe side : scalar
private static final GeneratorMapping PROJECT_PROBE =
GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval method */, null /* reset */,
null /* cleanup */);
// Generator mapping for the probe side : constant
- private static final GeneratorMapping PROJECT_PROBE_CONSTANT =
- GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */,
- null /* cleanup */);
+ private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */,
+ "doSetup" /* eval method */,
+ null /* reset */, null /* cleanup */);
// Mapping set for the build side
@@ -127,9 +133,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
"outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD);
// Mapping set for the probe side
- private final MappingSet projectProbeMapping =
- new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, "probeBatch" /* read container */,
- "outgoing" /* write container */, PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+ private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+ "probeBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+
+ // indicates if we have previously returned an output batch
+ boolean firstOutputBatch = true;
IterOutcome leftUpstream = IterOutcome.NONE;
IterOutcome rightUpstream = IterOutcome.NONE;
@@ -156,6 +166,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return outputRecords;
}
+
@Override
protected void buildSchema() throws SchemaChangeException {
leftUpstream = next(left);
@@ -341,9 +352,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
case OK:
int currentRecordCount = right.getRecordCount();
- /* For every new build batch, we store some state in the helper context
- * Add new state to the helper context
- */
+ /* For every new build batch, we store some state in the helper context
+ * Add new state to the helper context
+ */
hjHelper.addNewBatch(currentRecordCount);
// Holder contains the global index where the key is hashed into using the hash table
@@ -352,21 +363,19 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// For every record in the build batch , hash the key columns
for (int i = 0; i < currentRecordCount; i++) {
- HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry count */);
+ hashTable.put(i, htIndex, 1 /* retry count */);
- if (status != HashTable.PutStatus.PUT_FAILED) {
- /* Use the global index returned by the hash table, to store
- * the current record index and batch index. This will be used
- * later when we probe and find a match.
- */
- hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
- }
+ /* Use the global index returned by the hash table, to store
+ * the current record index and batch index. This will be used
+ * later when we probe and find a match.
+ */
+ hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
}
- /* Completed hashing all records in this batch. Transfer the batch
- * to the hyper vector container. Will be used when we want to retrieve
- * records that have matching keys on the probe side.
- */
+ /* Completed hashing all records in this batch. Transfer the batch
+ * to the hyper vector container. Will be used when we want to retrieve
+ * records that have matching keys on the probe side.
+ */
RecordBatchData nextBatch = new RecordBatchData(right);
if (hyperContainer == null) {
hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
@@ -386,8 +395,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
- final CodeGenerator<HashJoinProbe> cg =
- CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<HashJoinProbe> g = cg.getRoot();
// Generate the code to project build side records
@@ -411,20 +419,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
// Add the vector to our output container
- // ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(),
- // outputType), context.getAllocator());
container.addOrGet(MaterializedField.create(field.getPath(), outputType));
JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
- .arg(outIndex).arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+ g.getEvalBlock().add(outVV.invoke("copyFromSafe")
+ .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
fieldId++;
}
}
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
// Generate the code to project probe side records
g.setMappingSet(projectProbeMapping);
@@ -432,6 +438,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
int outputFieldId = fieldId;
fieldId = 0;
JExpression probeIndex = JExpr.direct("probeIndex");
+ int recordCount = 0;
if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
for (VectorWrapper<?> vv : left) {
@@ -453,15 +460,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()
- ._return(JExpr.FALSE);
+ g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
fieldId++;
outputFieldId++;
}
+ recordCount = left.getRecordCount();
}
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
HashJoinProbe hj = context.getImplementationClass(cg);
@@ -518,4 +523,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
right.cleanup();
left.cleanup();
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 7599f9e..42c7010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -49,6 +49,6 @@ public interface HashJoinProbe {
JoinRelType joinRelType);
public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
- public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+ public abstract void projectBuildRecord(int buildIndex, int outIndex);
+ public abstract void projectProbeRecord(int probeIndex, int outIndex);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c58f9a3..dcf73b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -98,18 +98,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
}
public void executeProjectRightPhase() {
- boolean success = true;
while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
- success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
- if (success) {
- recordsProcessed++;
- outputRecords++;
- } else {
- if (outputRecords == 0) {
- throw new IllegalStateException("Too big to fail.");
- }
- break;
- }
+ projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
+ recordsProcessed++;
+ outputRecords++;
}
}
@@ -178,64 +170,38 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
*/
hjHelper.setRecordMatched(currentCompositeIdx);
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
- && projectProbeRecord(recordsProcessed, outputRecords);
- if (!success) {
- // we failed to project. redo this record.
- getNextRecord = false;
- return;
+ projectBuildRecord(currentCompositeIdx, outputRecords);
+ projectProbeRecord(recordsProcessed, outputRecords);
+ outputRecords++;
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
} else {
- outputRecords++;
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
*/
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
+ getNextRecord = false;
}
-
} else { // No matching key
// If we have a left outer join, project the keys
if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- boolean success = projectProbeRecord(recordsProcessed, outputRecords);
- if (!success) {
- if (outputRecords == 0) {
- throw new IllegalStateException("Record larger than single batch.");
- } else {
- // we've output some records but failed to output this one. return and wait for next call.
- return;
- }
- }
- assert success;
+ projectProbeRecord(recordsProcessed, outputRecords);
outputRecords++;
}
recordsProcessed++;
}
} else {
hjHelper.setRecordMatched(currentCompositeIdx);
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
- && projectProbeRecord(recordsProcessed, outputRecords);
- if (!success) {
- if (outputRecords == 0) {
- throw new IllegalStateException("Record larger than single batch.");
- } else {
- // we've output some records but failed to output this one. return and wait for next call.
- return;
- }
- }
- assert success;
+ projectBuildRecord(currentCompositeIdx, outputRecords);
+ projectProbeRecord(recordsProcessed, outputRecords);
outputRecords++;
currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
@@ -276,8 +242,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
@Named("outgoing") RecordBatch outgoing);
- public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+ public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
- public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+ public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index c1dffc1..48a0996 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -75,6 +75,8 @@ import org.eigenbase.rel.JoinRelType;
*/
public abstract class JoinTemplate implements JoinWorker {
+ private static final int OUTPUT_BATCH_SIZE = 32*1024;
+
@Override
public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
doSetup(context, status, outgoing);
@@ -86,7 +88,7 @@ public abstract class JoinTemplate implements JoinWorker {
* @return true of join succeeded; false if the worker needs to be regenerated
*/
public final boolean doJoin(final JoinStatus status) {
- while (true) {
+ for (int i = 0; i < OUTPUT_BATCH_SIZE; i++) {
// for each record
// validate input iterators (advancing to the next record batch if necessary)
@@ -94,9 +96,7 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
- return false;
- }
+ doCopyLeft(status.getLeftPosition(), status.getOutPosition());
status.incOutputPos();
status.advanceLeft();
@@ -114,9 +114,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
- return false;
- }
+ doCopyLeft(status.getLeftPosition(), status.getOutPosition());
status.incOutputPos();
}
status.advanceLeft();
@@ -142,13 +140,9 @@ public abstract class JoinTemplate implements JoinWorker {
int initialRightPosition = status.getRightPosition();
do {
// copy all equal right keys to the output record batch
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
- return false;
- }
+ doCopyLeft(status.getLeftPosition(), status.getOutPosition());
- if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) {
- return false;
- }
+ doCopyRight(status.getRightPosition(), status.getOutPosition());
status.incOutputPos();
@@ -197,6 +191,7 @@ public abstract class JoinTemplate implements JoinWorker {
throw new IllegalStateException();
}
}
+ return false;
}
// Generated Methods
@@ -213,8 +208,8 @@ public abstract class JoinTemplate implements JoinWorker {
* @param outIndex position of the output record batch
* @return Whether or not the data was copied.
*/
- public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
- public abstract boolean doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
+ public abstract void doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+ public abstract void doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index d0f9d7d..8a6e1f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -376,16 +376,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(outputType,vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+ cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
.arg(copyLeftMapping.getValueReadIndex())
.arg(copyLeftMapping.getValueWriteIndex())
- .arg(vvIn).eq(JExpr.FALSE))
- ._then()
- ._return(JExpr.FALSE);
+ .arg(vvIn));
++vectorId;
}
}
- cg.getEvalBlock()._return(JExpr.lit(true));
// generate copyRight()
///////////////////////
@@ -406,16 +403,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(outputType,vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+ cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
.arg(copyRightMappping.getValueReadIndex())
.arg(copyRightMappping.getValueWriteIndex())
- .arg(vvIn).eq(JExpr.FALSE))
- ._then()
- ._return(JExpr.FALSE);
+ .arg(vvIn));
++vectorId;
}
}
- cg.getEvalBlock()._return(JExpr.lit(true));
JoinWorker w = context.getImplementationClass(cg);
w.setupJoin(context, status, this.container);
@@ -469,7 +463,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
for (VectorWrapper w : container) {
- AllocationHelper.allocate(w.getValueVector(), 5000, 50);
+ AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
}
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
index 2885c52..f2a95b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
@@ -34,7 +34,7 @@ public interface MergingReceiverGeneratorBase {
public abstract int doEval(int leftIndex,
int rightIndex);
- public abstract boolean doCopy(int inIndex, int outIndex);
+ public abstract void doCopy(int inIndex, int outIndex);
public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION =
new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
index c29ef75..537ae74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
@@ -35,5 +35,5 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato
public abstract int doEval(@Named("leftIndex") int leftIndex,
@Named("rightIndex") int rightIndex);
- public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index acbb755..d78ba8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -71,7 +71,9 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.eigenbase.rel.RelFieldCollation.Direction;
@@ -90,6 +92,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+ private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
private RecordBatchLoader[] batchLoaders;
private RawFragmentBatchProvider[] fragProviders;
@@ -266,9 +269,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// allocate a new value vector
ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
- outgoingVector.allocateNew();
++vectorCount;
}
+ allocateOutgoing();
schema = bldr.build();
@@ -559,7 +562,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
private void allocateOutgoing() {
- outgoingContainer.allocateNew();
+ for (VectorWrapper w : outgoingContainer) {
+ ValueVector v = w.getValueVector();
+ if (v instanceof FixedWidthVector) {
+ AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
+ } else {
+ v.allocateNewSafe();
+ }
+ }
}
// private boolean isOutgoingFull() {
@@ -648,12 +658,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
*/
private boolean copyRecordToOutgoingBatch(Node node) {
int inIndex = (node.batchId << 16) + node.valueIndex;
- if (!merger.doCopy(inIndex, outgoingPosition)) {
+ merger.doCopy(inIndex, outgoingPosition);
+ outgoingPosition++;
+ if (outgoingPosition == OUTGOING_BATCH_SIZE) {
return false;
- } else {
- outgoingPosition++;
- return true;
}
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index f5068b4..3c4e9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -66,9 +66,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
int counter = 0;
for (int i = 0; i < countN; i++, firstOutputIndex++) {
int partition = getPartition(i);
- if (!partitionValues.getMutator().setSafe(i, partition)) {
- throw new RuntimeException();
- }
+ partitionValues.getMutator().setSafe(i, partition);
counter++;
}
for(TransferPair t : transfers){
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 20f6195..4292c09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -148,15 +148,7 @@ public abstract class PartitionerTemplate implements Partitioner {
case NONE:
for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(recordId));
- if (!outgoingBatch.copy(recordId)) {
- logger.trace(REWRITE_MSG, recordId);
- outgoingBatch.flush();
- if (!outgoingBatch.copy(recordId)) {
- String msg = composeTooBigMsg(recordId, incoming);
- logger.debug(msg);
- throw new IOException(msg);
- }
- }
+ outgoingBatch.copy(recordId);
}
break;
@@ -164,15 +156,7 @@ public abstract class PartitionerTemplate implements Partitioner {
for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
int svIndex = sv2.getIndex(recordId);
OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
- if (!outgoingBatch.copy(svIndex)) {
- logger.trace(REWRITE_MSG, svIndex);
- outgoingBatch.flush();
- if (!outgoingBatch.copy(svIndex)) {
- String msg = composeTooBigMsg(recordId, incoming);
- logger.debug(msg);
- throw new IOException(msg);
- }
- }
+ outgoingBatch.copy(svIndex);
}
break;
@@ -180,15 +164,7 @@ public abstract class PartitionerTemplate implements Partitioner {
for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
int svIndex = sv4.get(recordId);
OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
- if (!outgoingBatch.copy(svIndex)) {
- logger.trace(REWRITE_MSG, svIndex);
- outgoingBatch.flush();
- if (!outgoingBatch.copy(svIndex)) {
- String msg = composeTooBigMsg(recordId, incoming);
- logger.debug(msg);
- throw new IOException(msg);
- }
- }
+ outgoingBatch.copy(svIndex);
}
break;
@@ -252,16 +228,13 @@ public abstract class PartitionerTemplate implements Partitioner {
this.statusHandler = statusHandler;
}
- protected boolean copy(int inIndex) throws IOException {
- if (doEval(inIndex, recordCount)) {
- recordCount++;
- totalRecords++;
- if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
- flush();
- }
- return true;
+ protected void copy(int inIndex) throws IOException {
+ doEval(inIndex, recordCount);
+ recordCount++;
+ totalRecords++;
+ if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
+ flush();
}
- return false;
}
@Override
@@ -273,7 +246,7 @@ public abstract class PartitionerTemplate implements Partitioner {
protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {};
@RuntimeOverridden
- protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; };
+ protected void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { };
public void flush() throws IOException {
if (dropAll) {
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 539d028..8f7812f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -64,6 +64,8 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -225,10 +227,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
private boolean doAlloc() {
//Allocate vv in the allocationVectors.
for (ValueVector v : this.allocationVectors) {
- //AllocationHelper.allocate(v, remainingRecordCount, 250);
- if (!v.allocateNewSafe()) {
- return false;
- }
+ AllocationHelper.allocateNew(v, incoming.getRecordCount());
}
//Allocate vv for complexWriters.
@@ -363,8 +362,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
TypedFieldId fid = container.getValueVectorId(outputField.getPath());
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
-
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
}
continue;
@@ -428,18 +425,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
ValueVector vector = container.addOrGet(outputField, callBack);
allocationVectors.add(vector);
TypedFieldId fid = container.getValueVectorId(outputField.getPath());
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
HoldingContainer hc = cg.addExpr(write);
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
logger.debug("Added eval for project expression.");
}
}
- cg.rotateBlock();
- cg.getEvalBlock()._return(JExpr.TRUE);
-
-
try {
this.projector = context.getImplementationClass(cg.getCodeGenerator());
projector.setup(context, incoming, this, transfers);