You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:45 UTC
[08/24] status changes
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 88bada5..5b61a82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -79,64 +79,69 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public IterOutcome next() {
-
- // this is only called on the first batch. Beyond this, the aggregator manages batches.
- if (aggregator == null) {
- IterOutcome outcome = incoming.next();
- logger.debug("Next outcome of {}", outcome);
- switch (outcome) {
- case NONE:
- case NOT_YET:
- case STOP:
- return outcome;
- case OK_NEW_SCHEMA:
- if (!createAggregator()){
- done = true;
- return IterOutcome.STOP;
+ stats.startProcessing();
+ try{
+ // this is only called on the first batch. Beyond this, the aggregator manages batches.
+ if (aggregator == null) {
+ IterOutcome outcome = next(incoming);
+ logger.debug("Next outcome of {}", outcome);
+ switch (outcome) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ return outcome;
+ case OK_NEW_SCHEMA:
+ if (!createAggregator()){
+ done = true;
+ return IterOutcome.STOP;
+ }
+ break;
+ case OK:
+ throw new IllegalStateException("You should never get a first batch without a new schema");
+ default:
+ throw new IllegalStateException(String.format("unknown outcome %s", outcome));
}
- break;
- case OK:
- throw new IllegalStateException("You should never get a first batch without a new schema");
- default:
- throw new IllegalStateException(String.format("unknown outcome %s", outcome));
}
- }
- while(true){
- AggOutcome out = aggregator.doWork();
- logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
- switch(out){
- case CLEANUP_AND_RETURN:
- container.clear();
- done = true;
- return aggregator.getOutcome();
- case RETURN_OUTCOME:
- return aggregator.getOutcome();
- case UPDATE_AGGREGATOR:
- aggregator = null;
- if(!createAggregator()){
- return IterOutcome.STOP;
+ while(true){
+ AggOutcome out = aggregator.doWork();
+ logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+ switch(out){
+ case CLEANUP_AND_RETURN:
+ container.clear();
+ done = true;
+ return aggregator.getOutcome();
+ case RETURN_OUTCOME:
+ return aggregator.getOutcome();
+ case UPDATE_AGGREGATOR:
+ aggregator = null;
+ if(!createAggregator()){
+ return IterOutcome.STOP;
+ }
+ continue;
+ default:
+ throw new IllegalStateException(String.format("Unknown state %s.", out));
}
- continue;
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", out));
}
+ }finally{
+ stats.stopProcessing();
}
-
}
-
-
+
+
/**
* Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
* and informing the context of the failure state, as well is informing the upstream operators.
- *
+ *
* @return true if the aggregator was setup successfully. false if there was a failure.
*/
private boolean createAggregator() {
logger.debug("Creating new aggregator.");
try{
+ stats.startSetup();
this.aggregator = createAggregatorInternal();
+ stats.stopSetup();
return true;
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
context.fail(ex);
@@ -153,13 +158,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
container.clear();
List<VectorAllocator> allocators = Lists.newArrayList();
-
+
LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length];
LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length];
TypedFieldId[] keyOutputIds = new TypedFieldId[popConfig.getKeys().length];
-
+
ErrorCollector collector = new ErrorCollectorImpl();
-
+
for(int i =0; i < keyExprs.length; i++){
NamedExpression ne = popConfig.getKeys()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
@@ -170,38 +175,38 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
allocators.add(VectorAllocator.getAllocator(vector, 50));
keyOutputIds[i] = container.add(vector);
}
-
+
for(int i =0; i < valueExprs.length; i++){
NamedExpression ne = popConfig.getExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
if(expr == null) continue;
-
+
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocators.add(VectorAllocator.getAllocator(vector, 50));
TypedFieldId id = container.add(vector);
valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
}
-
+
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-
+
setupIsSame(cg, keyExprs);
setupIsSameApart(cg, keyExprs);
addRecordValues(cg, valueExprs);
outputRecordKeys(cg, keyOutputIds, keyExprs);
outputRecordKeysPrev(cg, keyOutputIds, keyExprs);
-
+
cg.getBlock("resetValues")._return(JExpr.TRUE);
getIndex(cg);
-
+
container.buildSchema(SelectionVectorMode.NONE);
StreamingAggregator agg = context.getImplementationClass(cg);
agg.setup(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
return agg;
}
-
-
-
+
+
+
private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
@@ -214,19 +219,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
HoldingContainer first = cg.addExpr(expr, false);
cg.setMappingSet(IS_SAME_I2);
HoldingContainer second = cg.addExpr(expr, false);
-
+
LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
HoldingContainer out = cg.addExpr(fh, false);
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.getEvalBlock()._return(JExpr.TRUE);
}
-
+
private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); // the internal batch changes each time so we need to redo setup.
private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSamePrev", null, null);
private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
-
+
private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
cg.setMappingSet(ISA_B1);
for(LogicalExpression expr : keyExprs){
@@ -242,11 +247,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
cg.getEvalBlock()._return(JExpr.TRUE);
}
-
+
private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null);
private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
-
+
private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){
cg.setMappingSet(EVAL);
for(LogicalExpression ex : valueExprs){
@@ -255,9 +260,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
-
+
private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
-
+
private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
cg.setMappingSet(RECORD_KEYS);
for(int i =0; i < keyExprs.length; i++){
@@ -266,13 +271,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
-
+
private final GeneratorMapping PREVIOUS_KEYS_OUT = GeneratorMapping.create("setupInterior", "outputRecordKeysPrev", null, null);
private final MappingSet RECORD_KEYS_PREV_OUT = new MappingSet("previousIndex", "outIndex", "previous", "outgoing", PREVIOUS_KEYS_OUT, PREVIOUS_KEYS_OUT);
private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null);
private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
-
+
private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
cg.setMappingSet(RECORD_KEYS_PREV);
@@ -285,11 +290,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
cg.setMappingSet(RECORD_KEYS_PREV_OUT);
HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false);
cg.getBlock(BlockType.EVAL)._if(outerExpression.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
-
+
}
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
-
+
private void getIndex(ClassGenerator<StreamingAggregator> g){
switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE: {
@@ -308,12 +313,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
g.getBlock("getVectorIndex")._return(var.invoke("getIndex").arg(JExpr.direct("recordIndex")));;
return;
}
-
+
default:
throw new IllegalStateException();
-
+
}
-
+
}
@Override
@@ -321,7 +326,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
super.cleanup();
incoming.cleanup();
}
-
+
@Override
protected void killIncoming() {
incoming.kill();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..6b768c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,25 +17,16 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.record.*;
-import org.eigenbase.rel.JoinRelType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
-import com.sun.codemodel.JExpr;
-
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -44,6 +35,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
@@ -51,14 +43,29 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.JoinRelType;
+
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+ private static final int LEFT_INPUT = 0;
+ private static final int RIGHT_INPUT = 1;
+
// Probe side record batch
private final RecordBatch left;
@@ -137,7 +144,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
@Override
public IterOutcome next() {
-
+ stats.startProcessing();
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
@@ -153,7 +160,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
* as well, for the materialization to be successful. This batch will not be used
* till we complete the build phase.
*/
- leftUpstream = left.next();
+ leftUpstream = next(LEFT_INPUT, left);
// Build the hash table, using the build side record batches.
executeBuildPhase();
@@ -200,12 +207,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
for (VectorWrapper<?> wrapper : left) {
wrapper.getValueVector().clear();
}
- leftUpstream = left.next();
+ leftUpstream = next(LEFT_INPUT, left);
while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
for (VectorWrapper<?> wrapper : left) {
wrapper.getValueVector().clear();
}
- leftUpstream = left.next();
+ leftUpstream = next(LEFT_INPUT, left);
}
}
}
@@ -214,6 +221,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return IterOutcome.NONE;
} catch (ClassTransformationException | SchemaChangeException | IOException e) {
+ stats.stopProcessing();
context.fail(e);
killIncoming();
return IterOutcome.STOP;
@@ -256,7 +264,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
//Setup the underlying hash table
- IterOutcome rightUpstream = right.next();
+ IterOutcome rightUpstream = next(RIGHT_INPUT, right);
boolean moreData = true;
@@ -323,7 +331,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
break;
}
// Get the next record batch
- rightUpstream = right.next();
+ rightUpstream = next(RIGHT_INPUT, right);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index c07878a..faca32a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -35,6 +35,9 @@ public final class JoinStatus {
INCOMING, SV4;
}
+ private static final int LEFT_INPUT = 0;
+ private static final int RIGHT_INPUT = 1;
+
public final RecordBatch left;
private int leftPosition;
private IterOutcome lastLeft;
@@ -63,10 +66,18 @@ public final class JoinStatus {
this.joinType = output.getJoinType();
}
+ private final IterOutcome nextLeft(){
+ return outputBatch.next(LEFT_INPUT, left);
+ }
+
+ private final IterOutcome nextRight(){
+ return outputBatch.next(RIGHT_INPUT, right);
+ }
+
public final void ensureInitial(){
if(!initialSet){
- this.lastLeft = left.next();
- this.lastRight = right.next();
+ this.lastLeft = nextLeft();
+ this.lastRight = nextRight();
initialSet = true;
}
}
@@ -148,7 +159,7 @@ public final class JoinStatus {
if (!isLeftPositionInCurrentBatch()) {
leftPosition = 0;
releaseData(left);
- lastLeft = left.next();
+ lastLeft = nextLeft();
return lastLeft == IterOutcome.OK;
}
lastLeft = IterOutcome.OK;
@@ -167,7 +178,7 @@ public final class JoinStatus {
if (!isRightPositionInCurrentBatch()) {
rightPosition = 0;
releaseData(right);
- lastRight = right.next();
+ lastRight = nextRight();
return lastRight == IterOutcome.OK;
}
lastRight = IterOutcome.OK;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 48b7fea..46dea64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -66,9 +66,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
+
public final MappingSet setupMapping =
- new MappingSet("null", "null",
+ new MappingSet("null", "null",
GM("doSetup", "doSetup", null, null),
GM("doSetup", "doSetup", null, null));
public final MappingSet copyLeftMapping =
@@ -96,7 +96,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
GM("doSetup", "doCompareNextLeftKey", null, null),
GM("doSetup", "doCompareNextLeftKey", null, null));
-
+
private final RecordBatch left;
private final RecordBatch right;
private final JoinStatus status;
@@ -104,7 +104,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final JoinRelType joinType;
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
-
+
protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context);
@@ -130,10 +130,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
@Override
public IterOutcome next() {
-
+
// we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
status.ensureInitial();
-
+
// loop so we can start over again if we find a new batch was created.
while(true){
@@ -153,14 +153,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
logger.debug("NO MORE DATA; returning {} NONE");
return IterOutcome.NONE;
}
-
+
boolean first = false;
if(worker == null){
try {
logger.debug("Creating New Worker");
+ stats.startSetup();
this.worker = generateNewWorker();
first = true;
+ stats.stopSetup();
} catch (ClassTransformationException | IOException | SchemaChangeException e) {
+ stats.stopSetup();
context.fail(new SchemaChangeException(e));
kill();
return IterOutcome.STOP;
@@ -222,13 +225,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
left.cleanup();
right.cleanup();
}
-
- private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
+
+ private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException {
boolean nextLeftIndexDeclared = false;
cg.setMappingSet(compareLeftMapping);
-
+
for (JoinCondition condition : conditions) {
final LogicalExpression leftFieldExpr = condition.getLeft();
@@ -242,52 +245,52 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
////////////////////////////////
cg.setMappingSet(compareLeftMapping);
cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
-
+
if (!nextLeftIndexDeclared) {
// int nextLeftIndex = leftIndex + 1;
cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
nextLeftIndexDeclared = true;
- }
+ }
// check if the next key is in this batch
cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
._then()
._return(JExpr.lit(-1));
-
+
// generate VV read expressions
ClassGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
cg.setMappingSet(compareNextLeftMapping); // change mapping from 'leftIndex' to 'nextLeftIndex'
ClassGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
-
+
if (compareThisLeftExprHolder.isOptional()) {
// handle null == null
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
.cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
._then()
._return(JExpr.lit(0));
-
+
// handle null == !null
cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
.cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
._then()
._return(JExpr.lit(1));
}
-
+
// check value equality
-
+
LogicalExpression gh = FunctionGenerationHelper.getComparator(compareThisLeftExprHolder,
compareNextLeftExprHolder,
context.getFunctionRegistry());
HoldingContainer out = cg.addExpr(gh, false);
-
- //If not 0, it means not equal. We return this out value.
+
+ //If not 0, it means not equal. We return this out value.
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
jc._then()._return(out.getValue());
}
-
+
//Pass the equality check for all the join conditions. Finally, return 0.
cg.getEvalBlock()._return(JExpr.lit(0));
- }
-
+ }
+
private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -322,11 +325,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
//generate doCompare() method
/////////////////////////////////////////
generateDoCompare(cg, incomingRecordBatch, incomingLeftRecordBatch, incomingRightRecordBatch, collector);
-
+
//generate doCompareNextLeftKey() method
/////////////////////////////////////////
generateDoCompareNextLeft(cg, incomingRecordBatch, incomingLeftRecordBatch, joinStatus, collector);
-
+
// generate copyLeft()
//////////////////////
cg.setMappingSet(copyLeftMapping);
@@ -394,12 +397,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private void allocateBatch() {
// allocate new batch space.
container.clear();
-
+
//estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0;
int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() : 0;
int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE);
-
+
// add fields from both batches
if (leftCount > 0) {
@@ -436,11 +439,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
logger.debug("Built joined schema: {}", container.getSchema());
}
- private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
+ private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
-
+
cg.setMappingSet(compareMapping);
-
+
for (JoinCondition condition : conditions) {
final LogicalExpression leftFieldExpr = condition.getLeft();
final LogicalExpression rightFieldExpr = condition.getRight();
@@ -482,37 +485,37 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
.cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
._then()
._return(JExpr.lit(0));
-
+
// handle null == !null
cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
.cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
._then()
._return(JExpr.lit(1));
-
+
} else if (compareLeftExprHolder.isOptional()) {
// handle null == required (null is less than any value)
cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
._then()
._return(JExpr.lit(-1));
-
+
} else if (compareRightExprHolder.isOptional()) {
// handle required == null (null is less than any value)
cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
._then()
._return(JExpr.lit(1));
}
-
+
LogicalExpression fh = FunctionGenerationHelper.getComparator(compareLeftExprHolder,
compareRightExprHolder,
- context.getFunctionRegistry());
+ context.getFunctionRegistry());
HoldingContainer out = cg.addExpr(fh, false);
-
- //If not 0, it means not equal. We return this out value.
+
+ //If not 0, it means not equal. We return this out value.
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
jc._then()._return(out.getValue());
}
-
- //Pass the equality check for all the join conditions. Finally, return 0.
- cg.getEvalBlock()._return(JExpr.lit(0));
+
+ //Pass the equality check for all the join conditions. Finally, return 0.
+ cg.getEvalBlock()._return(JExpr.lit(0));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3d496d3..cc38cbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -27,8 +27,6 @@ import java.util.PriorityQueue;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
@@ -41,19 +39,28 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
import org.eigenbase.rel.RelFieldCollation.Direction;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.sun.codemodel.JArray;
import com.sun.codemodel.JClass;
@@ -92,6 +99,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private List<VectorAllocator> allocators;
private MergingReceiverPOP config;
+ public static enum Metric implements MetricDef{
+ NEXT_WAIT_NANOS;
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
public MergingRecordBatch(FragmentContext context,
MergingReceiverPOP config,
RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
@@ -104,8 +120,21 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.outgoingContainer = new VectorContainer();
}
+ private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+ long startNext = System.nanoTime();
+ RawFragmentBatch b = provider.getNext();
+ if(b != null){
+ stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ }
+ stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext);
+ return b;
+ }
+
@Override
public IterOutcome next() {
+ stats.startProcessing();
+ try{
+
if (fragProviders.length == 0) return IterOutcome.NONE;
boolean schemaChanged = false;
@@ -131,7 +160,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
for (RawFragmentBatchProvider provider : fragProviders) {
RawFragmentBatch rawBatch = null;
try {
- rawBatch = provider.getNext();
+ rawBatch = getNext(provider);
} catch (IOException e) {
context.fail(e);
return IterOutcome.STOP;
@@ -238,10 +267,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// reached the end of an incoming record batch
RawFragmentBatch nextBatch = null;
try {
- nextBatch = fragProviders[node.batchId].getNext();
+ nextBatch = getNext(fragProviders[node.batchId]);
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
- nextBatch = fragProviders[node.batchId].getNext();
+ nextBatch = getNext(fragProviders[node.batchId]);
}
} catch (IOException e) {
context.fail(e);
@@ -301,6 +330,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return IterOutcome.OK_NEW_SCHEMA;
else
return IterOutcome.OK;
+
+ }finally{
+ stats.stopProcessing();
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 4641de6..f105363 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -166,7 +166,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
recordsSampled += incoming.getRecordCount();
outer: while (recordsSampled < recordsToSample) {
- upstream = incoming.next();
+ upstream = next(incoming);
switch (upstream) {
case NONE:
case NOT_YET:
@@ -414,97 +414,102 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
@Override
public IterOutcome next() {
- container.zeroVectors();
-
- // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
- // done
- if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
- return IterOutcome.NONE;
-
- // if there are batches on the queue, process them first, rather than calling incoming.next()
- if (batchQueue != null && batchQueue.size() > 0) {
- VectorContainer vc = batchQueue.poll();
- recordCount = vc.getRecordCount();
- try {
-
- // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
- setupNewSchema(vc);
- } catch (SchemaChangeException ex) {
- kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
+ stats.startProcessing();
+ try{
+ container.zeroVectors();
+
+ // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
+ // done
+ if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
+ return IterOutcome.NONE;
+
+ // if there are batches on the queue, process them first, rather than calling incoming.next()
+ if (batchQueue != null && batchQueue.size() > 0) {
+ VectorContainer vc = batchQueue.poll();
+ recordCount = vc.getRecordCount();
+ try {
+
+ // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
+ setupNewSchema(vc);
+ } catch (SchemaChangeException ex) {
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ doWork(vc);
+ vc.zeroVectors();
+ return IterOutcome.OK_NEW_SCHEMA;
}
- doWork(vc);
- vc.zeroVectors();
- return IterOutcome.OK_NEW_SCHEMA;
- }
- // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are
- // more incoming
- IterOutcome upstream = incoming.next();
+ // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are
+ // more incoming
+ IterOutcome upstream = next(incoming);
- if (this.first && upstream == IterOutcome.OK) {
- throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
- }
-
- // If this is the first iteration, we need to generate the partition vectors before we can proceed
- if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
- if (!getPartitionVectors()){
- cleanup();
- return IterOutcome.STOP;
+ if (this.first && upstream == IterOutcome.OK) {
+ throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
}
- batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
- first = false;
-
- // Now that we have the partition vectors, we immediately process the first batch on the queue
- VectorContainer vc = batchQueue.poll();
- try {
- setupNewSchema(vc);
- } catch (SchemaChangeException ex) {
- kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
+ // If this is the first iteration, we need to generate the partition vectors before we can proceed
+ if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
+ if (!getPartitionVectors()){
+ cleanup();
+ return IterOutcome.STOP;
+ }
+
+ batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
+ first = false;
+
+ // Now that we have the partition vectors, we immediately process the first batch on the queue
+ VectorContainer vc = batchQueue.poll();
+ try {
+ setupNewSchema(vc);
+ } catch (SchemaChangeException ex) {
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ doWork(vc);
+ vc.zeroVectors();
+ recordCount = vc.getRecordCount();
+ return IterOutcome.OK_NEW_SCHEMA;
}
- doWork(vc);
- vc.zeroVectors();
- recordCount = vc.getRecordCount();
- return IterOutcome.OK_NEW_SCHEMA;
- }
- // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the
- // first one
- // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
- if (this.startedUnsampledBatches == false) {
- this.startedUnsampledBatches = true;
- if (upstream == IterOutcome.OK)
- upstream = IterOutcome.OK_NEW_SCHEMA;
- }
- switch (upstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- cleanup();
- recordCount = 0;
- return upstream;
- case OK_NEW_SCHEMA:
- try {
- setupNewSchema(incoming);
- } catch (SchemaChangeException ex) {
- kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
+ // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the
+ // first one
+ // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
+ if (this.startedUnsampledBatches == false) {
+ this.startedUnsampledBatches = true;
+ if (upstream == IterOutcome.OK)
+ upstream = IterOutcome.OK_NEW_SCHEMA;
+ }
+ switch (upstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ cleanup();
+ recordCount = 0;
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try {
+ setupNewSchema(incoming);
+ } catch (SchemaChangeException ex) {
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ doWork(incoming);
+ recordCount = incoming.getRecordCount();
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
}
- // fall through.
- case OK:
- doWork(incoming);
- recordCount = incoming.getRecordCount();
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ }finally{
+ stats.stopProcessing();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 844d6db..b6b4c33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,11 +20,10 @@ package org.apache.drill.exec.record;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -36,12 +35,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected final T popConfig;
protected final FragmentContext context;
protected final OperatorContext oContext;
+ protected final OperatorStats stats;
protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
super();
this.context = context;
this.popConfig = popConfig;
this.oContext = new OperatorContext(popConfig, context);
+ this.stats = oContext.getStats();
}
@Override
@@ -58,6 +59,27 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return popConfig;
}
+ public final IterOutcome next(RecordBatch b){
+ return next(0, b);
+ }
+
+ public final IterOutcome next(int inputIndex, RecordBatch b){
+ stats.stopProcessing();
+ IterOutcome next = b.next();
+
+ switch(next){
+ case OK_NEW_SCHEMA:
+ stats.batchReceived(inputIndex, b.getRecordCount(), true);
+ break;
+ case OK:
+ stats.batchReceived(inputIndex, b.getRecordCount(), false);
+ break;
+ }
+
+ stats.startProcessing();
+ return next;
+ }
+
@Override
public BatchSchema getSchema() {
return container.getSchema();
@@ -75,6 +97,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
oContext.close();
}
+
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index dd2cfe0..13e4ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -29,7 +29,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
protected final RecordBatch incoming;
private boolean first = true;
protected boolean outOfMemory = false;
-
+
public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
@@ -42,36 +42,46 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
@Override
public IterOutcome next() {
- IterOutcome upstream = incoming.next();
- if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
- first = false;
- switch(upstream){
- case NONE:
- case NOT_YET:
- case STOP:
- return upstream;
- case OUT_OF_MEMORY:
- return upstream;
- case OK_NEW_SCHEMA:
- try{
- setupNewSchema();
- }catch(SchemaChangeException ex){
- kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- doWork();
- if (outOfMemory) {
- outOfMemory = false;
- return IterOutcome.OUT_OF_MEMORY;
+ try{
+ stats.startProcessing();
+ IterOutcome upstream = next(incoming);
+ if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
+ first = false;
+ switch(upstream){
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ return upstream;
+ case OUT_OF_MEMORY:
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try{
+ stats.startSetup();
+ setupNewSchema();
+ stats.stopSetup();
+ }catch(SchemaChangeException ex){
+ stats.stopSetup();
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ doWork();
+ if (outOfMemory) {
+ outOfMemory = false;
+ return IterOutcome.OUT_OF_MEMORY;
+ }
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
}
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ }finally{
+ stats.stopProcessing();
}
+
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 7f607a3..7297dc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,7 +22,6 @@ import java.io.Closeable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMultiMap;
import org.apache.drill.exec.cache.HazelCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
@@ -31,6 +30,8 @@ import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.work.WorkManager;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletHandler;
import com.google.common.io.Closeables;
@@ -71,6 +72,7 @@ public class Drillbit implements Closeable{
final DistributedCache cache;
final WorkManager manager;
final BootStrapContext context;
+ final Server embeddedJetty;
private volatile RegistrationHandle handle;
@@ -89,8 +91,17 @@ public class Drillbit implements Closeable{
this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
this.cache = new HazelCache(config, context.getAllocator());
}
+ this.embeddedJetty = new Server(474747);
}
+ private void setupJetty(){
+ ServletHandler handler = new ServletHandler();
+ embeddedJetty.setHandler(handler);
+
+ }
+
+
+
public void run() throws Exception {
coord.start(10000);
DrillbitEndpoint md = engine.start();
@@ -99,6 +110,7 @@ public class Drillbit implements Closeable{
manager.getContext().getStorage().init();
manager.getContext().getOptionManager().init();
handle = coord.register(md);
+ embeddedJetty.start();
}
public void close() {
@@ -109,7 +121,11 @@ public class Drillbit implements Closeable{
} catch (InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
}
-
+ try {
+ embeddedJetty.stop();
+ } catch (Exception e) {
+ logger.warn("Failure while shutting down embedded jetty server.");
+ }
Closeables.closeQuietly(engine);
Closeables.closeQuietly(coord);
Closeables.closeQuietly(manager);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 2965e79..718da23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
@@ -65,7 +66,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
private final String name;
protected final CompressionCodecFactory codecFactory;
private final boolean compressible;
-
+
protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
@@ -80,7 +81,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
this.name = name == null ? defaultName : name;
this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
}
-
+
@Override
public DrillFileSystem getFileSystem() {
return fs;
@@ -90,7 +91,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public DrillbitContext getContext() {
return context;
}
-
+
@Override
public String getName() {
return name;
@@ -99,7 +100,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
/**
* Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
* only split on file boundaries.
- *
+ *
* @return True if splittable.
*/
public boolean isBlockSplittable() {
@@ -184,7 +185,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
}
-
+
@Override
public FormatPluginConfig getConfig() {
return formatConfig;
@@ -214,5 +215,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of();
}
-
+
+ public abstract int getReaderOperatorType();
+ public abstract int getWriterOperatorType();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 0b3fe0f..5f9226e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
@@ -45,13 +46,13 @@ public class EasySubScan extends AbstractSubScan{
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
private String selectionRoot;
-
+
@JsonCreator
public EasySubScan(
@JsonProperty("files") List<FileWorkImpl> files, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
- @JacksonInject StoragePluginRegistry engineRegistry, //
+ @JacksonInject StoragePluginRegistry engineRegistry, //
@JsonProperty("columns") List<SchemaPath> columns, //
@JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
@@ -62,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{
this.columns = columns;
this.selectionRoot = selectionRoot;
}
-
+
public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
this.formatPlugin = plugin;
this.files = files;
@@ -74,7 +75,7 @@ public class EasySubScan extends AbstractSubScan{
public String getSelectionRoot() {
return selectionRoot;
}
-
+
@JsonIgnore
public EasyFormatPlugin<?> getFormatPlugin(){
return formatPlugin;
@@ -100,11 +101,15 @@ public class EasySubScan extends AbstractSubScan{
return formatPlugin.getConfig();
}
}
-
+
@JsonProperty("columns")
public List<SchemaPath> getColumns(){
return columns;
}
-
+ @Override
+ public int getOperatorType() {
+ return formatPlugin.getReaderOperatorType();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 864ae48..5ca781b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -94,4 +94,9 @@ public class EasyWriter extends AbstractWriter {
// TODO:
return new OperatorCost(1,1,1,1);
}
+
+ @Override
+ public int getOperatorType() {
+ return formatPlugin.getReaderOperatorType();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 0c50898..89694f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.direct;
import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.RecordReader;
public class DirectSubScan extends AbstractSubScan{
@@ -34,4 +35,10 @@ public class DirectSubScan extends AbstractSubScan{
return reader;
}
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.DIRECT_SUB_SCAN_VALUE;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 04a9768..e410306 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -22,12 +22,14 @@ import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Lists;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -80,4 +82,16 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
}
}
+
+ @Override
+ public int getReaderOperatorType() {
+ return CoreOperatorType.JSON_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public int getWriterOperatorType() {
+ throw new UnsupportedOperationException();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index f6cc58e..cd28d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -71,7 +72,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
@Override
public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
+ return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
}
@Override
@@ -127,6 +128,16 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
return true;
return false;
}
-
+
+ }
+
+ @Override
+ public int getReaderOperatorType() {
+ return CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public int getWriterOperatorType() {
+ return CoreOperatorType.TEXT_WRITER_VALUE;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 7f2d0f1..ecd952c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -25,11 +25,13 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -144,4 +146,9 @@ public class HiveSubScan extends AbstractBase implements SubScan {
public Iterator<PhysicalOperator> iterator() {
return Iterators.emptyIterator();
}
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.HIVE_SUB_SCAN_VALUE;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index dce4d3b..70e1258 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -18,16 +18,17 @@
package org.apache.drill.exec.store.ischema;
import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class InfoSchemaSubScan extends AbstractSubScan{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaSubScan.class);
-
+
private final SelectedTable table;
-
+
@JsonCreator
public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table) {
this.table = table;
@@ -36,6 +37,10 @@ public class InfoSchemaSubScan extends AbstractSubScan{
public SelectedTable getTable() {
return table;
}
-
-
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.INFO_SCHEMA_SUB_SCAN_VALUE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
index f616bca..869e40c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractStore;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -51,7 +52,7 @@ public class MockStorePOP extends AbstractStore {
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-
+
}
@Override
@@ -69,7 +70,10 @@ public class MockStorePOP extends AbstractStore {
return new MockStorePOP(child);
}
+ @Override
+ public int getOperatorType() {
+ throw new UnsupportedOperationException();
+ }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
index 0753be5..517ad3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -86,7 +87,7 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
// will want to replace these two methods with an interface above for AbstractSubScan
@Override
public boolean isExecutable() {
- return true;
+ return true;
}
@Override
@@ -103,4 +104,9 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
}
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.MOCK_SUB_SCAN_VALUE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index dd5c91c..1f66e9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -140,4 +141,9 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
return columns;
}
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 51e9219..762942d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -19,12 +19,14 @@ package org.apache.drill.exec.store.parquet;
import com.fasterxml.jackson.annotation.*;
import com.google.common.base.Preconditions;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import java.io.IOException;
@@ -88,4 +90,10 @@ public class ParquetWriter extends AbstractWriter {
// TODO:
return new OperatorCost(1,1,1,1);
}
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.PARQUET_WRITER_VALUE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 36b7509..3dbb98e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -99,7 +99,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
this.queryRequest = queryRequest;
this.context = new QueryContext(connection.getSession(), queryId, dContext);
this.initiatingClient = connection;
- this.fragmentManager = new QueryManager(new ForemanManagerListener(), dContext.getController());
+ this.fragmentManager = new QueryManager(bee.getContext().getCache(), new ForemanManagerListener(), dContext.getController());
this.bee = bee;
this.state = new AtomicState<QueryState>(QueryState.PENDING) {
@@ -168,7 +168,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
* Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
*/
public void run() {
-
+
final String originalThread = Thread.currentThread().getName();
Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
// convert a run query request into action
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
new file mode 100644
index 0000000..509000f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.work.foreman;
+
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+public class FragmentData {
+ private final boolean isLocal;
+ private volatile FragmentStatus status;
+ private volatile long lastStatusUpdate = 0;
+ private final DrillbitEndpoint endpoint;
+
+ public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
+ super();
+ this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
+ this.endpoint = endpoint;
+ this.isLocal = isLocal;
+ }
+
+ public void setStatus(FragmentStatus status){
+ this.status = status;
+ lastStatusUpdate = System.currentTimeMillis();
+ }
+
+ public FragmentStatus getStatus() {
+ return status;
+ }
+
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public long getLastStatusUpdate() {
+ return lastStatusUpdate;
+ }
+
+ public DrillbitEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 01b0df8..c9c769a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -20,17 +20,13 @@ package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.physical.impl.ImplCreator;
-import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -51,28 +47,26 @@ import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
-import com.google.common.collect.Maps;
-
/**
- * Each Foreman holds its own fragment manager. This manages the events associated with execution of a particular query across all fragments.
+ * Each Foreman holds its own fragment manager. This manages the events associated with execution of a particular query across all fragments.
*/
public class QueryManager implements FragmentStatusListener{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
-
- public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+
+ private final QueryStatus status;
private final Controller controller;
private ForemanManagerListener foreman;
private AtomicInteger remainingFragmentCount;
private WorkEventBus workBus;
private FragmentExecutor rootRunner;
private volatile QueryId queryId;
-
- public QueryManager(ForemanManagerListener foreman, Controller controller) {
+
+ public QueryManager(DistributedCache cache, ForemanManagerListener foreman, Controller controller) {
super();
this.foreman = foreman;
this.controller = controller;
this.remainingFragmentCount = new AtomicInteger(0);
-
+ this.status = new QueryStatus(cache);
}
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
@@ -90,44 +84,44 @@ public class QueryManager implements FragmentStatusListener{
logger.debug("Setting buffers on root context.");
rootContext.setBuffers(buffers);
// add fragment to local node.
- map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+ status.add(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
logger.debug("Fragment added to local node.");
rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment));
RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
-
+
if(buffers.isDone()){
// if we don't have to wait for any incoming data, start the fragment runner.
bee.addFragmentRunner(fragmentManager.getRunnable());
}else{
// if we do, record the fragment manager in the workBus.
- workBus.setRootFragmentManager(fragmentManager);
+ workBus.setRootFragmentManager(fragmentManager);
}
-
-
+
+
}
// keep track of intermediate fragments (not root or leaf)
for (PlanFragment f : intermediateFragments) {
logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
- map.put(f.getHandle(), new FragmentData(f.getHandle(), f.getAssignment(), false));
+ status.add(f.getHandle(), new FragmentData(f.getHandle(), f.getAssignment(), false));
}
// send remote (leaf) fragments.
for (PlanFragment f : leafFragments) {
sendRemoteFragment(f);
}
-
+
logger.debug("Fragment runs setup is complete.");
}
-
+
private void sendRemoteFragment(PlanFragment fragment){
logger.debug("Sending remote fragment to node {} with data {}", fragment.getAssignment(), fragment.getFragmentJson());
- map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
+ status.add(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
controller.getTunnel(fragment.getAssignment()).sendFragment(listener, fragment);
}
-
-
+
+
@Override
public void statusUpdate(FragmentStatus status) {
logger.debug("New fragment status was provided to Foreman of {}", status);
@@ -151,11 +145,11 @@ public class QueryManager implements FragmentStatusListener{
throw new UnsupportedOperationException();
}
}
-
+
private void updateStatus(FragmentStatus status){
- map.get(status.getHandle()).setStatus(status);
+ this.status.update(status);
}
-
+
private void finished(FragmentStatus status){
updateStatus(status);
int remaining = remainingFragmentCount.decrementAndGet();
@@ -167,15 +161,15 @@ public class QueryManager implements FragmentStatusListener{
foreman.cleanupAndSendResult(result);
}
}
-
+
private void fail(FragmentStatus status){
updateStatus(status);
stopQuery();
QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
foreman.cleanupAndSendResult(result);
}
-
-
+
+
private void stopQuery(){
// Stop all queries with a currently active status.
// for(FragmentData data: map.values()){
@@ -195,13 +189,13 @@ public class QueryManager implements FragmentStatusListener{
// }
// }
}
-
+
public void cancel(){
stopQuery();
}
private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-
+
public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
super(endpoint, handle);
}
@@ -220,15 +214,13 @@ public class QueryManager implements FragmentStatusListener{
}
};
-
+
public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
return new FragmentSubmitListener(endpoint, value);
}
-
-
-
+
private class FragmentSubmitListener extends EndpointListener<Ack, PlanFragment>{
-
+
public FragmentSubmitListener(DrillbitEndpoint endpoint, PlanFragment value) {
super(endpoint, value);
}
@@ -240,44 +232,6 @@ public class QueryManager implements FragmentStatusListener{
}
}
-
-
- private class FragmentData{
- private final boolean isLocal;
- private volatile FragmentStatus status;
- private volatile long lastStatusUpdate = 0;
- private final DrillbitEndpoint endpoint;
-
- public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
- super();
- this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
- this.endpoint = endpoint;
- this.isLocal = isLocal;
- }
-
- public void setStatus(FragmentStatus status){
- this.status = status;
- lastStatusUpdate = System.currentTimeMillis();
- }
-
- public FragmentStatus getStatus() {
- return status;
- }
-
- public boolean isLocal() {
- return isLocal;
- }
-
- public long getLastStatusUpdate() {
- return lastStatusUpdate;
- }
-
- public DrillbitEndpoint getEndpoint() {
- return endpoint;
- }
-
-
- }
private class RootStatusHandler extends AbstractStatusReporter{
@@ -290,7 +244,7 @@ public class QueryManager implements FragmentStatusListener{
QueryManager.this.statusUpdate(status);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
new file mode 100644
index 0000000..09858d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -0,0 +1,33 @@
+package org.apache.drill.exec.work.foreman;
+
+import java.util.Map;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import com.google.common.collect.Maps;
+
+public class QueryStatus {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
+
+ public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+
+ private final String queryId;
+
+ public QueryStatus(QueryId id, DistributedCache cache){
+ this.queryId = QueryIdHelper.getQueryId(id);
+ cache.getMultiMap(QueryStatus.class);
+
+ }
+
+ void add(FragmentHandle handle, FragmentData data){
+ if(map.put(handle, data) != null) throw new IllegalStateException();
+ }
+
+ void update(FragmentStatus status){
+ map.get(status.getHandle()).setStatus(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
deleted file mode 100644
index c75808d..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl;
-
-import com.google.caliper.runner.CaliperMain;
-
-
-public class PerformanceTests {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PerformanceTests.class);
-
-
- public static void main(String[] args){
- CaliperMain.main(TestExecutionAbstractions.class, args);
- System.out.println("Hello");
- }
-}