You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/15 14:12:31 UTC
[2/5] drill git commit: DRILL-1942-hygiene: - add AutoCloseable to
many classes - minor fixes - formatting
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index c1d78c3..432e06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Filter;
@@ -44,11 +43,10 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
private SelectionVector2 sv2;
private SelectionVector4 sv4;
- private BufferAllocator.PreAllocator svAllocator;
private Filterer filter;
public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
@@ -84,7 +82,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
return IterOutcome.OK;
}
-
@Override
public void close() {
if (sv2 != null) {
@@ -152,15 +149,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
cg.addExpr(new ReturnValueExpression(expr));
-// for (VectorWrapper<?> i : incoming) {
-// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
-// container.add(v);
-// allocators.add(getAllocator4(v));
-// }
-
- for (VectorWrapper<?> vw : incoming) {
- for (ValueVector vv : vw.getValueVectors()) {
- TransferPair pair = vv.getTransferPair();
+ for (final VectorWrapper<?> vw : incoming) {
+ for (final ValueVector vv : vw.getValueVectors()) {
+ final TransferPair pair = vv.getTransferPair();
container.add(pair.getTo());
transfers.add(pair);
}
@@ -170,8 +161,8 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
container.buildSchema(SelectionVectorMode.FOUR_BYTE);
try {
- TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
- Filterer filter = context.getImplementationClass(cg);
+ final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+ final Filterer filter = context.getImplementationClass(cg);
filter.setup(context, incoming, this, tx);
return filter;
} catch (ClassTransformationException | IOException e) {
@@ -192,21 +183,18 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
cg.addExpr(new ReturnValueExpression(expr));
- for (VectorWrapper<?> v : incoming) {
- TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
+ for (final VectorWrapper<?> v : incoming) {
+ final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
-
try {
- TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
- Filterer filter = context.getImplementationClass(cg);
+ final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+ final Filterer filter = context.getImplementationClass(cg);
filter.setup(context, incoming, this, tx);
return filter;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 73f3435..21ebd9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -63,7 +63,6 @@ import com.sun.codemodel.JExpression;
import com.sun.codemodel.JVar;
public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
-
public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
@@ -186,8 +185,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
hjHelper = new HashJoinHelper(context, oContext.getAllocator());
try {
rightSchema = right.getSchema();
- VectorContainer vectors = new VectorContainer(oContext);
- for (VectorWrapper w : right) {
+ final VectorContainer vectors = new VectorContainer(oContext);
+ for (final VectorWrapper<?> w : right) {
vectors.addOrGet(w.getField());
}
vectors.buildSchema(SelectionVectorMode.NONE);
@@ -198,7 +197,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
setupHashTable();
hashJoinProbe = setupHashJoinProbe();
// Build the container schema and set the counts
- for (VectorWrapper w : container) {
+ for (final VectorWrapper<?> w : container) {
w.getValueVector().allocateNew();
}
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -243,8 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
state = BatchState.NOT_FIRST;
}
-
- for (VectorWrapper<?> v : container) {
+ for (final VectorWrapper<?> v : container) {
v.getValueVector().getMutator().setValueCount(outputRecords);
}
@@ -253,13 +251,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
} else {
// Our build side is empty, we won't have any matches, clear the probe side
if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
+ for (final VectorWrapper<?> wrapper : left) {
wrapper.getValueVector().clear();
}
left.kill(true);
leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
+ for (final VectorWrapper<?> wrapper : left) {
wrapper.getValueVector().clear();
}
leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
@@ -281,11 +279,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
-
// Setup the hash table configuration object
int conditionsSize = conditions.size();
-
- NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+ final NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
JoinComparator comparator = JoinComparator.NONE;
@@ -299,7 +295,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
assert comparator != JoinComparator.NONE;
- boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
+ final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
// Set the left named expression to be null if the probe batch is empty.
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
@@ -310,24 +306,23 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
}
- HashTableConfig htConfig =
+ final HashTableConfig htConfig =
new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
// Create the chained hash table
- ChainedHashTable ht =
+ final ChainedHashTable ht =
new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null,
areNullsEqual);
hashTable = ht.createAndSetupHashTable(null);
}
public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
//Setup the underlying hash table
// skip first batch if count is zero, as it may be an empty schema batch
if (right.getRecordCount() == 0) {
- for (VectorWrapper w : right) {
+ for (final VectorWrapper<?> w : right) {
w.clear();
}
rightUpstream = next(right);
@@ -336,9 +331,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
boolean moreData = true;
while (moreData) {
-
switch (rightUpstream) {
-
case OUT_OF_MEMORY:
case NONE:
case NOT_YET:
@@ -362,7 +355,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
// Fall through
case OK:
- int currentRecordCount = right.getRecordCount();
+ final int currentRecordCount = right.getRecordCount();
/* For every new build batch, we store some state in the helper context
* Add new state to the helper context
@@ -370,11 +363,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
hjHelper.addNewBatch(currentRecordCount);
// Holder contains the global index where the key is hashed into using the hash table
- IndexPointer htIndex = new IndexPointer();
+ final IndexPointer htIndex = new IndexPointer();
// For every record in the build batch , hash the key columns
for (int i = 0; i < currentRecordCount; i++) {
-
hashTable.put(i, htIndex, 1 /* retry count */);
/* Use the global index returned by the hash table, to store
@@ -388,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
* to the hyper vector container. Will be used when we want to retrieve
* records that have matching keys on the probe side.
*/
- RecordBatchData nextBatch = new RecordBatchData(right);
+ final RecordBatchData nextBatch = new RecordBatchData(right);
boolean success = false;
try {
if (hyperContainer == null) {
@@ -412,27 +404,22 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
}
-
public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
-
-
final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<HashJoinProbe> g = cg.getRoot();
+ final ClassGenerator<HashJoinProbe> g = cg.getRoot();
// Generate the code to project build side records
g.setMappingSet(projectBuildMapping);
-
int fieldId = 0;
- JExpression buildIndex = JExpr.direct("buildIndex");
- JExpression outIndex = JExpr.direct("outIndex");
+ final JExpression buildIndex = JExpr.direct("buildIndex");
+ final JExpression outIndex = JExpr.direct("outIndex");
g.rotateBlock();
if (rightSchema != null) {
- for (MaterializedField field : rightSchema) {
-
- MajorType inputType = field.getType();
- MajorType outputType;
+ for (final MaterializedField field : rightSchema) {
+ final MajorType inputType = field.getType();
+ final MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2197).
if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
@@ -447,8 +434,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Add the vector to our output container
container.addOrGet(projected);
- JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+ final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
+ final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
g.getEvalBlock().add(outVV.invoke("copyFromSafe")
.arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
@@ -463,14 +450,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
int outputFieldId = fieldId;
fieldId = 0;
- JExpression probeIndex = JExpr.direct("probeIndex");
- int recordCount = 0;
+ final JExpression probeIndex = JExpr.direct("probeIndex");
if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
- for (VectorWrapper<?> vv : left) {
-
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
+ for (final VectorWrapper<?> vv : left) {
+ final MajorType inputType = vv.getField().getType();
+ final MajorType outputType;
// If right or full outer join then the output type should be optional. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
@@ -481,30 +466,28 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
outputType = inputType;
}
- ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType));
+ final ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType));
if (v instanceof AbstractContainerVector) {
vv.getValueVector().makeTransferPair(v);
v.clear();
}
- JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+ final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+ final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
fieldId++;
outputFieldId++;
}
- recordCount = left.getRecordCount();
}
- HashJoinProbe hj = context.getImplementationClass(cg);
-
+ final HashJoinProbe hj = context.getImplementationClass(cg);
return hj;
}
private void allocateVectors() {
- for (VectorWrapper<?> v : container) {
+ for (final VectorWrapper<?> v : container) {
v.getValueVector().allocateNew();
}
}
@@ -514,8 +497,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
super(popConfig, context, true);
this.left = left;
this.right = right;
- this.joinType = popConfig.getJoinType();
- this.conditions = popConfig.getConditions();
+ joinType = popConfig.getJoinType();
+ conditions = popConfig.getConditions();
}
private void updateStats(HashTable htable) {
@@ -523,16 +506,16 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return;
}
htable.getStats(htStats);
- this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
- this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
- this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
- this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+ stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+ stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+ stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+ stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
}
@Override
public void killIncoming(boolean sendUpstream) {
- this.left.kill(sendUpstream);
- this.right.kill(sendUpstream);
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
@Override
@@ -551,5 +534,4 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
super.close();
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2d37fa5..8f2dad3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -39,9 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.server.options.DrillConfigIterator.Iter;
import org.apache.drill.exec.vector.AllocationHelper;
-
import com.google.common.base.Preconditions;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
@@ -183,7 +181,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
outputRecords = nljWorker.outputRecords();
// Set the record count
- for (VectorWrapper vw : container) {
+ for (final VectorWrapper<?> vw : container) {
vw.getValueVector().getMutator().setValueCount(outputRecords);
}
@@ -202,7 +200,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
}
right.kill(true);
while (hasMore(rightUpstream)) {
- for (VectorWrapper<?> wrapper : right) {
+ for (final VectorWrapper<?> wrapper : right) {
wrapper.getValueVector().clear();
}
rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
@@ -280,7 +278,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
* Simple method to allocate space for all the vectors in the container.
*/
private void allocateVectors() {
- for (VectorWrapper vw : container) {
+ for (final VectorWrapper<?> vw : container) {
AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
}
}
@@ -309,7 +307,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
if (leftUpstream != IterOutcome.NONE) {
leftSchema = left.getSchema();
- for (VectorWrapper vw : left) {
+ for (final VectorWrapper<?> vw : left) {
container.addOrGet(vw.getField());
}
@@ -321,7 +319,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
if (rightUpstream != IterOutcome.NONE) {
rightSchema = right.getSchema();
- for (VectorWrapper vw : right) {
+ for (final VectorWrapper<?> vw : right) {
container.addOrGet(vw.getField());
}
addBatchToHyperContainer(right);
@@ -341,7 +339,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
}
private void addBatchToHyperContainer(RecordBatch inputBatch) {
- RecordBatchData batchCopy = new RecordBatchData(inputBatch);
+ final RecordBatchData batchCopy = new RecordBatchData(inputBatch);
boolean success = false;
try {
rightCounts.addLast(inputBatch.getRecordCount());
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 4ea5a5c..06dd699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -33,19 +33,19 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import com.google.common.collect.Lists;
public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
private int recordsToSkip;
private int recordsLeft;
- private boolean noEndLimit;
+ private final boolean noEndLimit;
private boolean skipBatch;
private boolean first = true;
- List<TransferPair> transfers = Lists.newArrayList();
+ private final List<TransferPair> transfers = Lists.newArrayList();
- public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming)
+ throws OutOfMemoryException {
super(popConfig, context, incoming);
outgoingSv = new SelectionVector2(oContext.getAllocator());
recordsToSkip = popConfig.getFirst();
@@ -62,14 +62,15 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
transfers.clear();
- for(VectorWrapper<?> v : incoming){
- TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
+ for(final VectorWrapper<?> v : incoming) {
+ final TransferPair pair = v.getValueVector().makeTransferPair(
+ container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
- BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+ final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
- switch(svMode){
+ switch(svMode) {
case NONE:
break;
case TWO_BYTE:
@@ -98,7 +99,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-
// Clear the memory for the incoming batch
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
@@ -126,15 +126,15 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
first = false;
}
skipBatch = false;
- int recordCount = incoming.getRecordCount();
+ final int recordCount = incoming.getRecordCount();
if (recordCount == 0) {
skipBatch = true;
return IterOutcome.OK;
}
- for(TransferPair tp : transfers) {
+ for(final TransferPair tp : transfers) {
tp.transfer();
}
- if(recordCount <= recordsToSkip) {
+ if (recordCount <= recordsToSkip) {
recordsToSkip -= recordCount;
skipBatch = true;
} else {
@@ -149,8 +149,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
return IterOutcome.OK;
}
+ // These two functions are identical except for the computation of the index; merge
private void limitWithNoSV(int recordCount) {
- int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+ final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
recordsToSkip -= offset;
int fetch;
@@ -162,15 +163,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
int svIndex = 0;
- for(char i = (char) offset; i < fetch; i++) {
+ for(char i = (char) offset; i < fetch; svIndex++, i++) {
outgoingSv.setIndex(svIndex, i);
- svIndex++;
}
outgoingSv.setRecordCount(svIndex);
}
private void limitWithSV(int recordCount) {
- int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+ final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
recordsToSkip -= offset;
int fetch;
@@ -182,10 +182,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
int svIndex = 0;
- for(int i = offset; i < fetch; i++) {
- char index = incomingSv.getIndex(i);
+ for(int i = offset; i < fetch; svIndex++, i++) {
+ final char index = incomingSv.getIndex(i);
outgoingSv.setIndex(svIndex, index);
- svIndex++;
}
outgoingSv.setRecordCount(svIndex);
}
@@ -196,9 +195,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- public void close(){
+ public void close() {
outgoingSv.clear();
super.close();
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 0050b45..3061f99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -159,8 +159,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
@Override
public void close() {
super.close();
- this.partitionVectors.clear();
- this.partitionKeyVector.clear();
+ partitionVectors.clear();
+ partitionKeyVector.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 31fc160..6e49e78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -58,8 +58,7 @@ import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
public class PartitionSenderRootExec extends BaseRootExec {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
private RecordBatch incoming;
private HashPartitionSender operator;
private PartitionerDecorator partitioner;
@@ -105,10 +104,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
this.incoming = incoming;
this.operator = operator;
this.context = context;
- this.outGoingBatchCount = operator.getDestinations().size();
- this.popConfig = operator;
- this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
- this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+ outGoingBatchCount = operator.getDestinations().size();
+ popConfig = operator;
+ remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
+ remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
// Algorithm to figure out number of threads to parallelize output
// numberOfRows/sliceTarget/numReceivers/threadfactor
@@ -137,7 +136,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
@Override
public boolean innerNext() {
-
if (!ok) {
return false;
}
@@ -332,6 +330,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
+ @Override
public void close() throws Exception {
logger.debug("Partition sender stopping.");
super.close();
@@ -340,7 +339,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
updateAggregateStats();
partitioner.clear();
}
-
}
public void sendEmptyBatch(boolean isLast) {
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index b9a1641..98ee320 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -108,7 +108,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
return false;
} else {
container.clear();
- for (final VectorWrapper w : newContainer) {
+ for (final VectorWrapper<?> w : newContainer) {
container.add(w.getValueVector());
}
container.buildSchema(SelectionVectorMode.NONE);
@@ -118,7 +118,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
private class Producer implements Runnable {
-
RecordBatchDataWrapper wrapper;
@Override
@@ -206,7 +205,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
cleanUpLatch.await();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
- // TODO InterruptedException
+ // TODO we should retry to wait for the latch
} finally {
super.close();
clearQueue();
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 407f05d..cebefa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -92,9 +92,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
if (schema != null) {
if (getSelectionVector4().next()) {
return IterOutcome.OK;
- } else {
- return IterOutcome.NONE;
}
+
+ return IterOutcome.NONE;
}
try{
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index b5b1b0a..cb04244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -185,7 +185,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- public void close(){
+ public void close() {
super.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index f1da1db..701ead5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -171,7 +171,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
@Override
protected void buildSchema() throws SchemaChangeException {
logger.trace("buildSchema()");
- IterOutcome outcome = next(incoming);
+ final IterOutcome outcome = next(incoming);
switch (outcome) {
case NONE:
state = BatchState.DONE;
@@ -208,7 +208,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
container.clear();
// all existing vectors will be transferred to the outgoing container in framer.doWork()
- for (VectorWrapper wrapper : batch) {
+ for (final VectorWrapper<?> wrapper : batch) {
container.addOrGet(wrapper.getField());
}
@@ -292,10 +292,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
cg.setMappingSet(rightMapping);
ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
- LogicalExpression fh =
+ final LogicalExpression fh =
FunctionGenerationHelper
.getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
- ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+ final ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.getEvalBlock()._return(JExpr.TRUE);
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index ed32f43..31deada 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -134,7 +134,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
copierAllocator = oContext.getAllocator().getChildAllocator(
- context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
+ context, PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION, true);
FragmentHandle handle = context.getHandle();
fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
@@ -188,7 +188,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
sv4.clear();
}
if (copier != null) {
- copier.cleanup();
+ copier.close();
}
copierAllocator.close();
super.close();
@@ -707,7 +707,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
g.setMappingSet(MAIN_MAPPING);
copier = context.getImplementationClass(cg);
} else {
- copier.cleanup();
+ copier.close();
}
BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator();
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 37529ff..d42e8d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -37,14 +37,13 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;
-public abstract class MSortTemplate implements MSorter, IndexedSortable{
+public abstract class MSortTemplate implements MSorter, IndexedSortable {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
private SelectionVector4 vector4;
private SelectionVector4 aux;
private long compares;
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
- private Queue<Integer> newRunStarts;
private FragmentContext context;
/**
@@ -67,7 +66,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
final int newBatch = this.vector4.get(i) >>> 16;
if (newBatch == batch) {
continue;
- } else if(newBatch == batch + 1) {
+ } else if (newBatch == batch + 1) {
runStarts.add(i);
batch = newBatch;
} else {
@@ -135,7 +134,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
int outIndex = 0;
- newRunStarts = Queues.newLinkedBlockingQueue();
+ final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
newRunStarts.add(outIndex);
final int size = runStarts.size();
for (int i = 0; i < size / 2; i++) {
@@ -155,9 +154,9 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount);
aux.clear();
- aux = this.vector4.createNewWrapperCurrent(desiredRecordBatchCount);
+ aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount);
vector4.clear();
- this.vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount);
+ vector4 = tmp.createNewWrapperCurrent(desiredRecordBatchCount);
tmp.clear();
runStarts = newRunStarts;
}
@@ -198,5 +197,4 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index 161ca6a..e0d9c2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -25,15 +25,18 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
-public interface PriorityQueueCopier {
- public static long initialAllocation = 10000000;
- public static long maxAllocation = 20000000;
+public interface PriorityQueueCopier extends AutoCloseable {
+ public static final long INITIAL_ALLOCATION = 10000000;
+ public static final long MAX_ALLOCATION = 20000000;
+
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch,
+ List<BatchGroup> batchGroups, VectorAccessible outgoing) throws SchemaChangeException;
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
- VectorAccessible outgoing) throws SchemaChangeException;
public int next(int targetRecordCount);
- public void cleanup();
- public static TemplateClassDefinition<PriorityQueueCopier> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueueCopier>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
+ public final static TemplateClassDefinition<PriorityQueueCopier> TEMPLATE_DEFINITION =
+ new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
+ @Override
+ abstract public void close(); // specify this to leave out the Exception
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index facf192..891907a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -66,7 +66,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
allocateVectors(targetRecordCount);
for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
if (queueSize == 0) {
- cleanup();
+ close();
return 0;
}
int compoundIndex = vector4.get(0);
@@ -96,12 +96,12 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
@Override
- public void cleanup() {
+ public void close() {
vector4.clear();
- for (VectorWrapper w: outgoing) {
+ for (final VectorWrapper<?> w: outgoing) {
w.getValueVector().clear();
}
- for (VectorWrapper w : hyperBatch) {
+ for (final VectorWrapper<?> w : hyperBatch) {
w.clear();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 8731739..d8f703e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
- final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
protected final VectorContainer container;
protected final T popConfig;
@@ -51,8 +51,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
- final OperatorContext oContext) throws OutOfMemoryException {
- super();
+ final OperatorContext oContext) {
this.context = context;
this.popConfig = popConfig;
this.oContext = oContext;
@@ -119,6 +118,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return next;
}
+ @Override
public final IterOutcome next() {
try {
stats.startProcessing();
@@ -174,11 +174,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected abstract void killIncoming(boolean sendUpstream);
- public void close(){
+ @Override
+ public void close() {
container.clear();
}
-
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
@@ -199,7 +199,6 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return container.getValueAccessorById(clazz, ids);
}
-
@Override
public WritableBatch getWritableBatch() {
// logger.debug("Getting writable batch.");
@@ -212,5 +211,4 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index dd90cab..e84057b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -25,7 +25,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
- final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
protected final RecordBatch incoming;
protected boolean outOfMemory = false;
@@ -51,7 +51,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
IterOutcome upstream = next(incoming);
if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
do {
- for (VectorWrapper w : incoming) {
+ for (final VectorWrapper<?> w : incoming) {
w.clear();
}
} while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
@@ -118,9 +118,9 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
public BatchSchema getSchema() {
if (container.hasSchema()) {
return container.getSchema();
- } else {
- return null;
}
+
+ return null;
}
protected abstract boolean setupNewSchema() throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index f2f9450..732129a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -25,21 +25,19 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.rpc.data.AckSender;
public class RawFragmentBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
private final FragmentRecordBatch header;
private final DrillBuf body;
private final AckSender sender;
-
- private AtomicBoolean ackSent = new AtomicBoolean(false);
+ private final AtomicBoolean ackSent = new AtomicBoolean(false);
public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
- super();
this.header = header;
- this.body = body;
this.sender = sender;
+ this.body = body;
if (body != null) {
- body.retain();
+ body.retain(1);
}
}
@@ -58,11 +56,10 @@ public class RawFragmentBatch {
public void release() {
if (body != null) {
- body.release();
+ body.release(1);
}
}
-
public AckSender getSender() {
return sender;
}
@@ -80,5 +77,4 @@ public class RawFragmentBatch {
public boolean isAckSent() {
return ackSent.get();
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 8e3b9e5..55ae309 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.StackTrace;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
@@ -34,9 +35,11 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{
private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
@@ -63,14 +66,14 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException {
if (logger.isTraceEnabled()) {
logger.trace("Loading record batch with def {} and data {}", def, buf);
- logger.trace("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only"));
+ logger.trace("Load, ThreadID: {}\n{}", Thread.currentThread().getId(), new StackTrace());
}
container.zeroVectors();
valueCount = def.getRecordCount();
boolean schemaChanged = schema == null;
final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
- for (final VectorWrapper wrapper : container) {
+ for(final VectorWrapper<?> wrapper : container) {
final ValueVector vector = wrapper.getValueVector();
oldFields.put(vector.getField(), vector);
}
@@ -79,7 +82,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
try {
final List<SerializedField> fields = def.getFieldList();
int bufOffset = 0;
- for (final SerializedField field : fields) {
+ for(final SerializedField field : fields) {
final MaterializedField fieldDef = MaterializedField.create(field);
ValueVector vector = oldFields.remove(fieldDef);
@@ -106,7 +109,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
// rebuild the schema.
final SchemaBuilder builder = BatchSchema.newBuilder();
- for (VectorWrapper<?> v : newVectors) {
+ for (final VectorWrapper<?> v : newVectors) {
builder.addField(v.getField());
}
builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -116,7 +119,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
} catch (final Throwable cause) {
// We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
// adjudicate to call upper layer specific clean up logic.
- for (final VectorWrapper wrapper:newVectors) {
+ for (final VectorWrapper<?> wrapper:newVectors) {
wrapper.getValueVector().clear();
}
throw cause;
@@ -132,12 +135,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
return schemaChanged;
}
+ @Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return container.getValueVectorId(path);
}
-
-
//
// @SuppressWarnings("unchecked")
// public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) {
@@ -152,10 +154,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
// return (T) v;
// }
+ @Override
public int getRecordCount() {
return valueCount;
}
+ @Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
return container.getValueAccessorById(clazz, ids);
}
@@ -170,11 +174,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
return this.container.iterator();
}
- public BatchSchema getSchema(){
+ @Override
+ public BatchSchema getSchema() {
return schema;
}
- public void clear(){
+ public void clear() {
container.clear();
}
@@ -184,7 +189,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
// rebuild the schema.
SchemaBuilder b = BatchSchema.newBuilder();
- for(VectorWrapper<?> v : container){
+ for(final VectorWrapper<?> v : container){
b.addField(v.getField());
}
b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
http://git-wip-us.apache.org/repos/asf/drill/blob/dca98ef6/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 6e27628..8ca3ec8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -80,7 +80,7 @@ public abstract class AbstractRecordReader implements RecordReader {
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
- for (ValueVector v : vectorMap.values()) {
+ for (final ValueVector v : vectorMap.values()) {
v.allocateNew();
}
}