You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/09/05 21:05:07 UTC
[2/3] drill git commit: DRILL-5546: Handle schema change exception
failure caused by empty input or empty batch.
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 5afe66b..4d623cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,18 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.union;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.util.Pair;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -39,88 +36,96 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.resolver.TypeCastRules;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
- private List<MaterializedField> outputFields;
+ private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private UnionAller unionall;
- private UnionAllInput unionAllInput;
- private RecordBatch current;
-
private final List<TransferPair> transfers = Lists.newArrayList();
- private List<ValueVector> allocationVectors;
- protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private List<ValueVector> allocationVectors = Lists.newArrayList();
private int recordCount = 0;
- private boolean schemaAvailable = false;
+ private UnionInputIterator unionInputIterator;
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
- super(config, context, false);
- assert (children.size() == 2) : "The number of the operands of Union must be 2";
- unionAllInput = new UnionAllInput(this, children.get(0), children.get(1));
- }
-
- @Override
- public int getRecordCount() {
- return recordCount;
+ super(config, context, true, children.get(0), children.get(1));
}
@Override
protected void killIncoming(boolean sendUpstream) {
- unionAllInput.getLeftRecordBatch().kill(sendUpstream);
- unionAllInput.getRightRecordBatch().kill(sendUpstream);
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
- }
+ protected void buildSchema() throws SchemaChangeException {
+ if (! prefetchFirstBatchFromBothSides()) {
+ return;
+ }
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
+ unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right);
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide(right.getSchema());
+ } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide((left.getSchema()));
+ } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+ }
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ VectorAccessibleUtilities.allocateVectors(container, 0);
+ VectorAccessibleUtilities.setValueCount(container,0);
}
@Override
public IterOutcome innerNext() {
try {
- IterOutcome upstream = unionAllInput.nextBatch();
- logger.debug("Upstream of Union-All: {}", upstream);
- switch (upstream) {
+ while (true) {
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+
+ Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+ IterOutcome upstream = nextBatch.left;
+ RecordBatch incoming = nextBatch.right;
+
+ switch (upstream) {
case NONE:
case OUT_OF_MEMORY:
case STOP:
return upstream;
-
case OK_NEW_SCHEMA:
- outputFields = unionAllInput.getOutputFields();
+ return doWork(nextBatch.right, true);
case OK:
- IterOutcome workOutcome = doWork();
-
- if (workOutcome != IterOutcome.OK) {
- return workOutcome;
- } else {
- return upstream;
+ // skip batches with same schema as the previous one yet having 0 row.
+ if (incoming.getRecordCount() == 0) {
+ VectorAccessibleUtilities.clear(incoming);
+ continue;
}
+ return doWork(nextBatch.right, false);
default:
throw new IllegalStateException(String.format("Unknown state %s.", upstream));
+ }
}
} catch (ClassTransformationException | IOException | SchemaChangeException ex) {
context.fail(ex);
@@ -130,120 +135,75 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
}
@Override
- public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
+ public int getRecordCount() {
+ return recordCount;
}
- private void setValueCount(int count) {
- for (ValueVector v : allocationVectors) {
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(count);
- }
- }
-
- private boolean doAlloc() {
- for (ValueVector v : allocationVectors) {
- try {
- AllocationHelper.allocateNew(v, current.getRecordCount());
- } catch (OutOfMemoryException ex) {
- return false;
- }
- }
- return true;
- }
@SuppressWarnings("resource")
- private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException {
- if (allocationVectors != null) {
- for (ValueVector v : allocationVectors) {
- v.clear();
- }
+ private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException {
+ Preconditions.checkArgument(inputBatch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
+ "Input batch and output batch have different field counthas!");
+
+ if (newSchema) {
+ createUnionAller(inputBatch);
}
- allocationVectors = Lists.newArrayList();
- transfers.clear();
+ container.zeroVectors();
+ VectorUtil.allocateVectors(allocationVectors, inputBatch.getRecordCount());
+ recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+ VectorUtil.setValueCount(allocationVectors, recordCount);
- // If both sides of Union-All are empty
- if (unionAllInput.isBothSideEmpty()) {
- for (MaterializedField materializedField : outputFields) {
- final String colName = materializedField.getName();
- final MajorType majorType = MajorType.newBuilder()
- .setMinorType(MinorType.INT)
- .setMode(DataMode.OPTIONAL)
- .build();
-
- MaterializedField outputField = MaterializedField.create(colName, majorType);
- ValueVector vv = container.addOrGet(outputField, callBack);
- allocationVectors.add(vv);
- }
-
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ return IterOutcome.OK;
}
+ }
+
+ private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException {
+ transfers.clear();
+ allocationVectors.clear();
final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// cg.getCodeGenerator().saveCodeForDebugging(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
+
int index = 0;
- for (VectorWrapper<?> vw : current) {
- ValueVector vvIn = vw.getValueVector();
- // get the original input column names
- SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
- // get the renamed column names
- SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getName());
+ for(VectorWrapper<?> vw : inputBatch) {
+ ValueVector vvIn = vw.getValueVector();
+ ValueVector vvOut = container.getValueVector(index).getValueVector();
final ErrorCollector collector = new ErrorCollectorImpl();
// According to input data names, Minortypes, Datamodes, choose to
// transfer directly,
// rename columns or
// cast data types (Minortype or DataMode)
- if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) {
+ if (container.getSchema().getColumn(index).hasSameTypeAndMode(vvIn.getField())
+ && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+ ) {
// Transfer column
+ TransferPair tp = vvIn.makeTransferPair(vvOut);
+ transfers.add(tp);
+ } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) {
+ continue;
+ } else { // Copy data in order to rename the column
+ SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
+ MaterializedField inField = vvIn.getField();
+ MaterializedField outputField = vvOut.getField();
- MajorType outputFieldType = outputFields.get(index).getType();
- MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
- outputFieldType);
-
- /*
- todo: Fix if condition when DRILL-4824 is merged
- If condition should be changed to:
- `if (outputFields.get(index).getName().equals(inputPath.getRootSegmentPath())) {`
- DRILL-5419 has changed condition to correct one but this caused regression (DRILL-5521).
- Root cause is missing indication of child column in map types when it is null.
- DRILL-4824 is re-working json reader implementation, including map types and will fix this problem.
- Reverting condition to previous one to avoid regression till DRILL-4824 is merged.
- Unit test - TestJsonReader.testKvgenWithUnionAll().
- */
- if (outputFields.get(index).getName().equals(inputPath)) {
- ValueVector vvOut = container.addOrGet(outputField);
- TransferPair tp = vvIn.makeTransferPair(vvOut);
- transfers.add(tp);
- // Copy data in order to rename the column
- } else {
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry() );
- if (collector.hasErrors()) {
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
- ValueVector vv = container.addOrGet(outputField, callBack);
- allocationVectors.add(vv);
- TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
- cg.addExpr(write);
- }
- // Cast is necessary
- } else {
- LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
// If the inputs' DataMode is required and the outputs' DataMode is not required
// cast to the one with the least restriction
- if (vvIn.getField().getType().getMode() == DataMode.REQUIRED
- && outputFields.get(index).getType().getMode() != DataMode.REQUIRED) {
- expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(), context.getFunctionRegistry(), collector);
+ if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
+ && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
+ expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -251,442 +211,163 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
// If two inputs' MinorTypes are different,
// Insert a cast before the Union operation
- if (vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType()) {
- expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(), context.getFunctionRegistry(), collector);
+ if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
+ expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
}
- final MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
- expr.getMajorType());
- ValueVector vector = container.addOrGet(outputField, callBack);
- allocationVectors.add(vector);
- TypedFieldId fid = container.getValueVectorId(outputPath);
+ TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
cg.addExpr(write);
+
+ allocationVectors.add(vvOut);
}
++index;
}
unionall = context.getImplementationClass(cg.getCodeGenerator());
- unionall.setup(context, current, this, transfers);
-
- if (!schemaAvailable) {
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- schemaAvailable = true;
- }
-
- if (!doAlloc()) {
- return IterOutcome.OUT_OF_MEMORY;
- }
-
- recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
- setValueCount(recordCount);
- return IterOutcome.OK;
- }
-
- public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
- return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
- && (leftField.getType().getMode() == rightField.getType().getMode());
+ unionall.setup(context, inputBatch, this, transfers);
}
- // This method is used by inner class to point the reference `current` to the correct record batch
- private void setCurrentRecordBatch(RecordBatch target) {
- this.current = target;
- }
- // This method is used by inner class to clear the current record batch
- private void clearCurrentRecordBatch() {
- for (VectorWrapper<?> v: current) {
- v.clear();
- }
- }
-
- public static class UnionAllInput {
- private UnionAllRecordBatch unionAllRecordBatch;
- private List<MaterializedField> outputFields;
- private OneSideInput leftSide;
- private OneSideInput rightSide;
- private IterOutcome upstream = IterOutcome.NOT_YET;
- private boolean leftIsFinish = false;
- private boolean rightIsFinish = false;
-
- // These two schemas are obtained from the first record batches of the left and right inputs
- // They are used to check if the schema is changed between recordbatches
- private BatchSchema leftSchema;
- private BatchSchema rightSchema;
- private boolean bothEmpty = false;
-
- public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
- this.unionAllRecordBatch = unionAllRecordBatch;
- leftSide = new OneSideInput(left);
- rightSide = new OneSideInput(right);
- }
-
- private void setBothSideEmpty(boolean bothEmpty) {
- this.bothEmpty = bothEmpty;
- }
-
- private boolean isBothSideEmpty() {
- return bothEmpty;
- }
-
- public IterOutcome nextBatch() throws SchemaChangeException {
- if (upstream == RecordBatch.IterOutcome.NOT_YET) {
- IterOutcome iterLeft = leftSide.nextBatch();
- switch (iterLeft) {
- case OK_NEW_SCHEMA:
- /*
- * If the first few record batches are all empty,
- * there is no way to tell whether these empty batches are coming from empty files.
- * It is incorrect to infer output types when either side could be coming from empty.
- *
- * Thus, while-loop is necessary to skip those empty batches.
- */
- whileLoop:
- while (leftSide.getRecordBatch().getRecordCount() == 0) {
- iterLeft = leftSide.nextBatch();
-
- switch(iterLeft) {
- case STOP:
- case OUT_OF_MEMORY:
- return iterLeft;
-
- case NONE:
- // Special Case: The left side was an empty input.
- leftIsFinish = true;
- break whileLoop;
-
- case NOT_YET:
- case OK_NEW_SCHEMA:
- case OK:
- continue whileLoop;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterLeft));
- }
- }
-
- break;
- case STOP:
- case OUT_OF_MEMORY:
- return iterLeft;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterLeft));
- }
-
- IterOutcome iterRight = rightSide.nextBatch();
- switch (iterRight) {
- case OK_NEW_SCHEMA:
- // Unless there is no record batch on the left side of the inputs,
- // always start processing from the left side.
- if (leftIsFinish) {
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- } else {
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- }
- // If the record count of the first batch from right input is zero,
- // there are two possibilities:
- // 1. The right side is an empty input (e.g., file).
- // 2. There will be more records carried by later batches.
-
- /*
- * If the first few record batches are all empty,
- * there is no way to tell whether these empty batches are coming from empty files.
- * It is incorrect to infer output types when either side could be coming from empty.
- *
- * Thus, while-loop is necessary to skip those empty batches.
- */
- whileLoop:
- while (rightSide.getRecordBatch().getRecordCount() == 0) {
- iterRight = rightSide.nextBatch();
- switch (iterRight) {
- case STOP:
- case OUT_OF_MEMORY:
- return iterRight;
-
- case NONE:
- // Special Case: The right side was an empty input.
- rightIsFinish = true;
- break whileLoop;
-
- case NOT_YET:
- case OK_NEW_SCHEMA:
- case OK:
- continue whileLoop;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterRight));
- }
- }
-
- if (leftIsFinish && rightIsFinish) {
- setBothSideEmpty(true);
- }
-
- inferOutputFields();
- break;
-
- case STOP:
- case OUT_OF_MEMORY:
- return iterRight;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterRight));
- }
-
-
-
- upstream = IterOutcome.OK_NEW_SCHEMA;
- return upstream;
+ // The output table's column names always follow the left table,
+ // where the output type is chosen based on DRILL's implicit casting rules
+ private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
+// outputFields = Lists.newArrayList();
+ final Iterator<MaterializedField> leftIter = leftSchema.iterator();
+ final Iterator<MaterializedField> rightIter = rightSchema.iterator();
+
+ int index = 1;
+ while (leftIter.hasNext() && rightIter.hasNext()) {
+ MaterializedField leftField = leftIter.next();
+ MaterializedField rightField = rightIter.next();
+
+ if (leftField.hasSameTypeAndMode(rightField)) {
+ TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+ builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
+ container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
+ } else if (Types.isUntypedNull(rightField.getType())) {
+ container.addOrGet(leftField, callBack);
+ } else if (Types.isUntypedNull(leftField.getType())) {
+ container.addOrGet(MaterializedField.create(leftField.getName(), rightField.getType()), callBack);
} else {
- if (isBothSideEmpty()) {
- return IterOutcome.NONE;
- }
-
- unionAllRecordBatch.clearCurrentRecordBatch();
-
- if (leftIsFinish && rightIsFinish) {
- upstream = IterOutcome.NONE;
- return upstream;
- } else if (leftIsFinish) {
- IterOutcome iterOutcome = rightSide.nextBatch();
-
- switch (iterOutcome) {
- case NONE:
- rightIsFinish = true;
- // fall through
- case STOP:
- case OUT_OF_MEMORY:
- upstream = iterOutcome;
- return upstream;
-
- case OK_NEW_SCHEMA:
- if (!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
- throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
- }
- iterOutcome = IterOutcome.OK;
- // fall through
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
- } else if (rightIsFinish) {
- IterOutcome iterOutcome = leftSide.nextBatch();
- switch (iterOutcome) {
- case STOP:
- case OUT_OF_MEMORY:
- case NONE:
- upstream = iterOutcome;
- return upstream;
-
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
- }
- } else {
- IterOutcome iterOutcome = leftSide.nextBatch();
-
- switch (iterOutcome) {
- case STOP:
- case OUT_OF_MEMORY:
- upstream = iterOutcome;
- return upstream;
-
- case OK_NEW_SCHEMA:
- if (!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
- throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
- }
-
- iterOutcome = IterOutcome.OK;
- // fall through
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- case NONE:
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- upstream = IterOutcome.OK;
- leftIsFinish = true;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
- }
- }
- }
-
- /**
- *
- * Summarize the inference in the four different situations:
- * First of all, the field names are always determined by the left side
- * (Even when the left side is from an empty file, we have the column names.)
- *
- * Cases:
- * 1. Left: non-empty; Right: non-empty
- * types determined by both sides with implicit casting involved
- * 2. Left: empty; Right: non-empty
- * type from the right
- * 3. Left: non-empty; Right: empty
- * types from the left
- * 4. Left: empty; Right: empty
- * types are nullable integer
- */
- private void inferOutputFields() {
- if (!leftIsFinish && !rightIsFinish) {
- // Both sides are non-empty
- inferOutputFieldsBothSide();
- } else if (!rightIsFinish) {
- // Left side is non-empty
- // While use left side's column names as output column names,
- // use right side's column types as output column types.
- inferOutputFieldsFromSingleSide(
- leftSide.getRecordBatch().getSchema(),
- rightSide.getRecordBatch().getSchema());
- } else {
- // Either right side is empty or both are empty
- // Using left side's schema is sufficient
- inferOutputFieldsFromSingleSide(
- leftSide.getRecordBatch().getSchema(),
- leftSide.getRecordBatch().getSchema());
- }
- }
-
- // The output table's column names always follow the left table,
- // where the output type is chosen based on DRILL's implicit casting rules
- private void inferOutputFieldsBothSide() {
- outputFields = Lists.newArrayList();
- leftSchema = leftSide.getRecordBatch().getSchema();
- rightSchema = rightSide.getRecordBatch().getSchema();
- Iterator<MaterializedField> leftIter = leftSchema.iterator();
- Iterator<MaterializedField> rightIter = rightSchema.iterator();
-
- int index = 1;
- while (leftIter.hasNext() && rightIter.hasNext()) {
- MaterializedField leftField = leftIter.next();
- MaterializedField rightField = rightIter.next();
-
- if (hasSameTypeAndMode(leftField, rightField)) {
- MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+ // If the output type is not the same,
+ // cast the column of one of the table to a data type which is the Least Restrictive
+ TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
+ if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
+ builder.setMinorType(leftField.getType().getMinorType());
builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
- outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
} else {
- // If the output type is not the same,
- // cast the column of one of the table to a data type which is the Least Restrictive
- MajorType.Builder builder = MajorType.newBuilder();
- if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
- builder.setMinorType(leftField.getType().getMinorType());
- builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
- } else {
- List<MinorType> types = Lists.newLinkedList();
- types.add(leftField.getType().getMinorType());
- types.add(rightField.getType().getMinorType());
- MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
- if (outputMinorType == null) {
- throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
- " on the left side and " + rightField.getType().getMinorType().toString() +
- " on the right side in column " + index + " of UNION ALL");
- }
- builder.setMinorType(outputMinorType);
+ List<TypeProtos.MinorType> types = Lists.newLinkedList();
+ types.add(leftField.getType().getMinorType());
+ types.add(rightField.getType().getMinorType());
+ TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
+ if (outputMinorType == null) {
+ throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
+ " on the left side and " + rightField.getType().getMinorType().toString() +
+ " on the right side in column " + index + " of UNION ALL");
}
-
- // The output data mode should be as flexible as the more flexible one from the two input tables
- List<DataMode> dataModes = Lists.newLinkedList();
- dataModes.add(leftField.getType().getMode());
- dataModes.add(rightField.getType().getMode());
- builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
-
- outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
+ builder.setMinorType(outputMinorType);
}
- ++index;
- }
- assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
- }
-
- private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final BatchSchema schemaForTypes) {
- outputFields = Lists.newArrayList();
+ // The output data mode should be as flexible as the more flexible one from the two input tables
+ List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
+ dataModes.add(leftField.getType().getMode());
+ dataModes.add(rightField.getType().getMode());
+ builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
- final List<String> outputColumnNames = Lists.newArrayList();
- for (MaterializedField materializedField : schemaForNames) {
- outputColumnNames.add(materializedField.getName());
- }
-
- final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
- for (int i = 0; iterForTypes.hasNext(); ++i) {
- MaterializedField field = iterForTypes.next();
- outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
+ container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
}
+ ++index;
}
- public List<MaterializedField> getOutputFields() {
- if (outputFields == null) {
- throw new NullPointerException("Output fields have not been inferred");
- }
-
- return outputFields;
- }
+ assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
+ }
- public void killIncoming(boolean sendUpstream) {
- leftSide.getRecordBatch().kill(sendUpstream);
- rightSide.getRecordBatch().kill(sendUpstream);
+ private void inferOutputFieldsOneSide(final BatchSchema schema) {
+ for (MaterializedField field : schema) {
+ container.addOrGet(field, callBack);
}
+ }
- public RecordBatch getLeftRecordBatch() {
- return leftSide.getRecordBatch();
- }
+ private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
+ return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
+ && (leftField.getType().getMode() == rightField.getType().getMode());
+ }
- public RecordBatch getRightRecordBatch() {
- return rightSide.getRecordBatch();
+ private class BatchStatusWrappper {
+ boolean prefetched;
+ final RecordBatch batch;
+ final int inputIndex;
+ final IterOutcome outcome;
+
+ BatchStatusWrappper(boolean prefetched, IterOutcome outcome, RecordBatch batch, int inputIndex) {
+ this.prefetched = prefetched;
+ this.outcome = outcome;
+ this.batch = batch;
+ this.inputIndex = inputIndex;
}
+ }
- private class OneSideInput {
- private IterOutcome upstream = IterOutcome.NOT_YET;
- private RecordBatch recordBatch;
+ private class UnionInputIterator implements Iterator<Pair<IterOutcome, RecordBatch>> {
+ private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
- public OneSideInput(RecordBatch recordBatch) {
- this.recordBatch = recordBatch;
+ UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
+ if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
+ batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, right, 1));
}
- public RecordBatch getRecordBatch() {
- return recordBatch;
+ if (leftOutCome == IterOutcome.OK_NEW_SCHEMA) {
+ batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, left, 0));
}
+ }
- public IterOutcome nextBatch() {
- if (upstream == IterOutcome.NONE) {
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
+ @Override
+ public boolean hasNext() {
+ return ! batchStatusStack.isEmpty();
+ }
- if (upstream == IterOutcome.NOT_YET) {
- upstream = unionAllRecordBatch.next(recordBatch);
+ @Override
+ public Pair<IterOutcome, RecordBatch> next() {
+ while (!batchStatusStack.isEmpty()) {
+ BatchStatusWrappper topStatus = batchStatusStack.peek();
- return upstream;
+ if (topStatus.prefetched) {
+ topStatus.prefetched = false;
+ return Pair.of(topStatus.outcome, topStatus.batch);
} else {
- do {
- upstream = unionAllRecordBatch.next(recordBatch);
- } while (upstream == IterOutcome.OK && recordBatch.getRecordCount() == 0);
-
- return upstream;
+ IterOutcome outcome = UnionAllRecordBatch.this.next(topStatus.inputIndex, topStatus.batch);
+ switch (outcome) {
+ case OK:
+ case OK_NEW_SCHEMA:
+ return Pair.of(outcome, topStatus.batch);
+ case OUT_OF_MEMORY:
+ case STOP:
+ batchStatusStack.pop();
+ return Pair.of(outcome, topStatus.batch);
+ case NONE:
+ batchStatusStack.pop();
+ if (batchStatusStack.isEmpty()) {
+ return Pair.of(IterOutcome.NONE, null);
+ }
+ break;
+ default:
+ throw new IllegalStateException(String.format("Unexpected state %s", outcome));
+ }
}
}
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 2be1ed5..a8ee0de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -249,14 +249,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
// OK doesn't change high-level state.
break;
case NONE:
- // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if
- // already terminated (checked above).
- if (validationState != ValidationState.HAVE_SCHEMA) {
- throw new IllegalStateException(
- String.format(
- "next() returned %s without first returning %s [#%d, %s]",
- batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
- }
+ // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
+ // FAST NONE.
// NONE moves to terminal high-level state.
validationState = ValidationState.TERMINAL;
break;
@@ -306,12 +300,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
"Incoming batch [#%d, %s] has a null schema. This is not allowed.",
instNum, batchTypeName));
}
- if (lastSchema.getFieldCount() == 0) {
- throw new IllegalStateException(
- String.format(
- "Incoming batch [#%d, %s] has an empty schema. This is not allowed.",
- instNum, batchTypeName));
- }
+ // It's legal for a batch to have zero field. For instance, a relational table could have
+ // zero columns. Querying such table requires execution operator to process batch with 0 field.
if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
throw new IllegalStateException(
String.format(
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index 2298df5..a8eddbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -39,6 +39,6 @@ public class ValuesBatchCreator implements BatchCreator<Values> {
assert children.isEmpty();
JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(), null, Collections.singletonList(SchemaPath.getSimplePath("*")));
- return new ScanBatch(config, context, Iterators.singletonIterator((RecordReader) reader));
+ return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 7e4483b..df80a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -42,8 +42,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.type.RelDataType;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import org.apache.drill.exec.util.Utilities;
/**
* GroupScan of a Drill table.
@@ -160,12 +159,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
final ScanStats stats = groupScan.getScanStats(settings);
int columnCount = getRowType().getFieldCount();
double ioCost = 0;
- boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return Preconditions.checkNotNull(input).equals("*");
- }
- }).isPresent();
+ boolean isStarQuery = Utilities.isStarQuery(columns);
if (isStarQuery) {
columnCount = STAR_COLUMN_COST;
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 25cd717..d974bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -35,18 +35,45 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
+/**
+ * A physical Prel node for Project operator.
+ */
public class ProjectPrel extends DrillProjectRelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectPrel.class);
+ private final boolean outputProj;
public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
RelDataType rowType) {
+ this(cluster, traits, child, exps, rowType, false);
+ }
+
+ /**
+ * Constructor for ProjectPrel.
+ * @param cluster
+ * @param traits traits of ProjectPrel node
+ * @param child input
+ * @param exps list of RexNode, representing expressions of projection.
+ * @param rowType output rowType of projection expression.
+ * @param outputProj true if ProjectPrel is inserted by {@link org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor}
+ * Such top Project operator does the following processing, before the result was presented to Screen/Writer
+ * <ol>
+ * <li>ensure final output field names are preserved</li>
+ * <li>handle cases where input does not return any batch (a fast NONE) (see ProjectRecordBatch.handleNullInput() method)</li>
+ * <li>handle cases where expressions in upstream operator were evaluated to NULL type </li>
+ * (Null type will be converted into Nullable-INT)
+ * </ol>
+ * false otherwise.
+ */
+ public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+ RelDataType rowType, boolean outputProj) {
super(DRILL_PHYSICAL, cluster, traits, child, exps, rowType);
+ this.outputProj = outputProj;
}
@Override
public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> exps, RelDataType rowType) {
- return new ProjectPrel(getCluster(), traitSet, input, exps, rowType);
+ return new ProjectPrel(getCluster(), traitSet, input, exps, rowType, this.outputProj);
}
@@ -57,7 +84,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
org.apache.drill.exec.physical.config.Project p = new org.apache.drill.exec.physical.config.Project(
- this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))), childPOP);
+ this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))), childPOP, outputProj);
return creator.addMetadata(this, p);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 587b006..08bd9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -133,9 +133,23 @@ public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeExcept
prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
- ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
-
- return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+ ProjectPrel topProject = new ProjectPrel(prel.getCluster(),
+ prel.getTraitSet(),
+ prel,
+ projections,
+ newRowType,
+ true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+
+ if (prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true)) {
+ return new ProjectPrel(prel.getCluster(),
+ prel.getTraitSet(),
+ ((Project) prel).getInput(),
+ ((Project) prel).getProjects(),
+ prel.getRowType(),
+ true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+ } else {
+ return topProject;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
new file mode 100644
index 0000000..1137922
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+ protected final RecordBatch left;
+ protected final RecordBatch right;
+
+ // state (IterOutcome) of the left input
+ protected IterOutcome leftUpstream = IterOutcome.NONE;
+
+ // state (IterOutcome) of the right input
+ protected IterOutcome rightUpstream = IterOutcome.NONE;
+
+ protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left,
+ RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context, true, context.newOperatorContext(popConfig));
+ this.left = left;
+ this.right = right;
+ }
+
+ protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left,
+ RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context, buildSchema);
+ this.left = left;
+ this.right = right;
+ }
+
+ /**
+ * Prefetch first batch from both inputs.
+ * @return true if caller should continue processing
+ * false if caller should stop and exit from processing.
+ */
+ protected boolean prefetchFirstBatchFromBothSides() {
+ leftUpstream = next(0, left);
+ rightUpstream = next(1, right);
+
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return false;
+ }
+
+ if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+ state = BatchState.OUT_OF_MEMORY;
+ return false;
+ }
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) {
+ state = BatchState.DONE;
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 65d164d..4a9828c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -61,6 +61,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
switch (upstream) {
case NONE:
+ if (state == BatchState.FIRST) {
+ return handleNullInput();
+ }
+ return upstream;
case NOT_YET:
case STOP:
if (state == BatchState.FIRST) {
@@ -125,4 +129,26 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
protected abstract boolean setupNewSchema() throws SchemaChangeException;
protected abstract IterOutcome doWork();
+
+ /**
+ * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
+ * This could happen when the underneath Scan operators do not produce any batch with schema.
+ *
+ * <p>
+ * Notice that NULL input is different from input with an empty batch. In the later case, input provides
+ * at least a batch, thought it's empty.
+ *</p>
+ *
+ * <p>
+ * This behavior could be override in each individual operator, if the operator's semantics is to
+ * inject a batch with schema.
+ *</p>
+ *
+ * @return IterOutcome.NONE.
+ */
+ protected IterOutcome handleNullInput() {
+ container.buildSchema(SelectionVectorMode.NONE);
+ return IterOutcome.NONE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
new file mode 100644
index 0000000..9bcea50
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+/**
+ * Wrap a VectorContainer into a record batch.
+ */
+public class SimpleRecordBatch implements RecordBatch {
+ private VectorContainer container;
+ private FragmentContext context;
+
+ public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
+ this.container = container;
+ this.context = context;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
+ @Override
+ public int getRecordCount() {
+ return container.getRecordCount();
+ }
+
+ @Override
+ public void kill(boolean sendUpstream) {
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public IterOutcome next() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 2152025..3a95d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -26,19 +26,14 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
public abstract class AbstractRecordReader implements RecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);
- private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
- public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
-
// For text reader, the default columns to read is "columns[0]".
protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));
@@ -62,7 +57,7 @@ public abstract class AbstractRecordReader implements RecordReader {
* 2) NULL : is NOT allowed. It requires the planner's rule, or GroupScan or ScanBatchCreator to handle NULL.
*/
protected final void setColumns(Collection<SchemaPath> projected) {
- Preconditions.checkNotNull(projected, COL_NULL_ERROR);
+ Preconditions.checkNotNull(projected, Utilities.COL_NULL_ERROR);
isSkipQuery = projected.isEmpty();
Collection<SchemaPath> columnsToRead = projected;
@@ -73,7 +68,7 @@ public abstract class AbstractRecordReader implements RecordReader {
columnsToRead = getDefaultColumnsToRead();
}
- isStarQuery = isStarQuery(columnsToRead);
+ isStarQuery = Utilities.isStarQuery(columnsToRead);
columns = transformColumns(columnsToRead);
logger.debug("columns to read : {}", columns);
@@ -99,15 +94,6 @@ public abstract class AbstractRecordReader implements RecordReader {
return isSkipQuery;
}
- public static boolean isStarQuery(Collection<SchemaPath> projected) {
- return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
- @Override
- public boolean apply(SchemaPath path) {
- return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
- }
- }).isPresent();
- }
-
@Override
public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
for (final ValueVector v : vectorMap.values()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index fa8121e..4b71b0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.fs.Path;
import java.util.List;
@@ -63,7 +64,7 @@ public class ColumnExplorer {
public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
this.columns = columns;
- this.isStarQuery = columns != null && AbstractRecordReader.isStarQuery(columns);
+ this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
this.selectedPartitionColumns = Lists.newArrayList();
this.tableColumns = Lists.newArrayList();
this.allImplicitColumns = initImplicitFileColumns(optionManager);
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 1f7bce9..f81f74e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(scan, context, oContext, readers.iterator(), implicitColumns);
+ return new ScanBatch(scan, context, oContext, readers, implicitColumns);
}
public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index d59cda2..8442c32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
@Override
public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
- return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
+ return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index ceb1deb..c406bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -229,7 +229,11 @@ public class JSONRecordReader extends AbstractRecordReader {
handleAndRaise("Error parsing JSON", ex);
}
}
- jsonReader.ensureAtLeastOneField(writer);
+ // Skip empty json file with 0 row.
+ // Only when data source has > 0 row, ensure the batch has one field.
+ if (recordCount > 0) {
+ jsonReader.ensureAtLeastOneField(writer);
+ }
writer.setValueCount(recordCount);
updateRunningCount();
return recordCount;
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index 199119d..60581a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -34,6 +34,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter(), context.getOptions());
- return new ScanBatch(config, context, Collections.singleton(rr).iterator());
+ return new ScanBatch(config, context, Collections.singletonList(rr));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 9a7563a..8f89eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -47,6 +47,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
readers.add(new MockRecordReader(context, e));
}
}
- return new ScanBatch(config, context, readers.iterator());
+ return new ScanBatch(config, context, readers);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 4a8c5f3..5ac10e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -429,7 +429,7 @@ public class Metadata {
List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
- ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
+ ALL_COLS.add(Utilities.STAR_COLUMN);
boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 78e9655..84e969a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
@@ -281,7 +281,7 @@ public class ParquetReaderUtility {
// this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
// creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive
- if (AbstractRecordReader.isStarQuery(columns)
+ if (Utilities.isStarQuery(columns)
|| new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
int colIndex = -1;
ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 21fc4ef..6017948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -153,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), implicitColumns);
+ return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
}
private static boolean isComplex(ParquetMetadata footer) {
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 9814b53..10187b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
@@ -225,7 +226,7 @@ public class ParquetSchema {
for (int i = 0; i < columnsFound.length; i++) {
SchemaPath col = projectedColumns.get(i);
assert col != null;
- if ( ! columnsFound[i] && ! col.equals(ParquetRecordReader.STAR_COLUMN)) {
+ if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) {
nullFilledVectors.add(createMissingColumn(col, output));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 2b0ef3f..ab87a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -46,6 +46,6 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
final Iterator<Object> iterator = table.getIterator(context);
final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator));
- return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
+ return new ScanBatch(scan, context, Collections.singletonList(reader));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 6ee3160..35358c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -17,14 +17,23 @@
*/
package org.apache.drill.exec.util;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import java.util.Collection;
+
public class Utilities {
+ public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+ public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
+
public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
/*
* From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
@@ -68,4 +77,18 @@ public class Utilities {
String v = Utilities.class.getPackage().getImplementationVersion();
return v;
}
+
+ /**
+ * Return true if list of schema path has star column.
+ * @param projected
+ * @return
+ */
+ public static boolean isStarQuery(Collection<SchemaPath> projected) {
+ return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
+ @Override
+ public boolean apply(SchemaPath path) {
+ return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+ }
+ }).isPresent();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index d836bfc..4de4c2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.record.VectorWrapper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -178,9 +180,20 @@ public class VectorUtil {
}
}
+ public static void allocateVectors(Iterable<ValueVector> valueVectors, int count) {
+ for (final ValueVector v : valueVectors) {
+ AllocationHelper.allocateNew(v, count);
+ }
+ }
+
+ public static void setValueCount(Iterable<ValueVector> valueVectors, int count) {
+ for (final ValueVector v : valueVectors) {
+ v.getMutator().setValueCount(count);
+ }
+ }
+
private static int getColumnWidth(int[] columnWidths, int columnIndex) {
return (columnWidths == null) ? DEFAULT_COLUMN_WIDTH
: (columnWidths.length > columnIndex) ? columnWidths[columnIndex] : columnWidths[0];
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 2bc78d4..990a24d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -316,7 +316,9 @@ public class DrillTestWrapper {
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
throws SchemaChangeException, UnsupportedEncodingException {
- return addToCombinedVectorResults(batches, null);
+ Map<String, List<Object>> combinedVectors = new TreeMap<>();
+ addToCombinedVectorResults(batches, null, combinedVectors);
+ return combinedVectors;
}
/**
@@ -324,18 +326,20 @@ public class DrillTestWrapper {
* @param batches
* @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
* if encounter different batch schema.
- * @return
+ * @param combinedVectors: the vectors to hold the values when iterate the batches.
+ *
+ * @return number of batches
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
- public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
+ public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
- Map<String, List<Object>> combinedVectors = new TreeMap<>();
-
+ int numBatch = 0;
long totalRecords = 0;
BatchSchema schema = null;
for (VectorAccessible loader : batches) {
+ numBatch++;
if (expectedSchema != null) {
if (! expectedSchema.equals(loader.getSchema())) {
throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
@@ -412,12 +416,12 @@ public class DrillTestWrapper {
}
}
}
- return combinedVectors;
+ return numBatch;
}
protected void compareSchemaOnly() throws Exception {
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- List<QueryDataBatch> actual;
+ List<QueryDataBatch> actual = null;
QueryDataBatch batch = null;
try {
test(testOptionSettingQueries);
@@ -448,8 +452,10 @@ public class DrillTestWrapper {
}
} finally {
- if (batch != null) {
- batch.release();
+ if (actual != null) {
+ for (QueryDataBatch b : actual) {
+ b.release();
+ }
}
loader.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 36a713f..acde8ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -39,6 +39,8 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.util.Text;
@@ -261,6 +263,14 @@ public class TestBuilder {
expectedNumBatches);
}
+ public SchemaTestBuilder schemaBaseLine(BatchSchema batchSchema) {
+ List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = new ArrayList<>();
+ for (final MaterializedField field : batchSchema) {
+ expectedSchema.add(Pair.of(SchemaPath.getSimplePath(field.getName()), field.getType()));
+ }
+ return schemaBaseLine(expectedSchema);
+ }
+
public SchemaTestBuilder schemaBaseLine(List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
assert expectedSchema != null : "The expected schema can be provided once";
assert baselineColumns == null : "The column information should be captured in expected schema, not baselineColumns";
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 97df2ee..bbfe093 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -416,7 +416,7 @@ public class TestExampleQueries extends BaseTestQuery {
@Test
public void testCase() throws Exception {
- test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end from cp.`tpch/nation.parquet` ;");
+ test("select case when n_nationkey > 0 and n_nationkey < 2 then concat(n_name, '_abc') when n_nationkey >=2 and n_nationkey < 4 then '_EFG' else concat(n_name,'_XYZ') end, n_comment from cp.`tpch/nation.parquet` ;");
}
@Test // tests join condition that has different input types
@@ -1194,5 +1194,4 @@ public class TestExampleQueries extends BaseTestQuery {
.build()
.run();
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 6965ab5..63d21ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.BufferedWriter;
@@ -586,7 +587,7 @@ public class TestUnionAll extends BaseTestQuery {
}
@Test
- public void testUnionAllRightEmptyBatch() throws Exception {
+ public void testUnionAllRightEmptyDataBatch() throws Exception {
String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
String queryRightEmptyBatch = String.format(
@@ -606,7 +607,7 @@ public class TestUnionAll extends BaseTestQuery {
}
@Test
- public void testUnionAllLeftEmptyBatch() throws Exception {
+ public void testUnionAllLeftEmptyDataBatch() throws Exception {
String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
final String queryLeftBatch = String.format(
@@ -627,7 +628,7 @@ public class TestUnionAll extends BaseTestQuery {
}
@Test
- public void testUnionAllBothEmptyBatch() throws Exception {
+ public void testUnionAllBothEmptyDataBatch() throws Exception {
String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
final String query = String.format(
"select key from dfs_test.`%s` where 1 = 0 " +
@@ -638,7 +639,7 @@ public class TestUnionAll extends BaseTestQuery {
final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
- .setMinorType(TypeProtos.MinorType.INT)
+ .setMinorType(TypeProtos.MinorType.BIT) // field "key" is boolean type
.setMode(TypeProtos.DataMode.OPTIONAL)
.build();
expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));