You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:48 UTC
[01/10] git commit: DRILL-317: StorageEngineRegistry creates a new
StorageEngine everytime getEngine() is called
Updated Branches:
refs/heads/master 3e2000341 -> 4a83dae35
DRILL-317: StorageEngineRegistry creates a new StorageEngine everytime getEngine() is called
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/622aad0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/622aad0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/622aad0f
Branch: refs/heads/master
Commit: 622aad0fa976608bbf95a573091fa4429d9b0bf5
Parents: 3e20003
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:33:34 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:33:34 2013 -0800
----------------------------------------------------------------------
.../drill/exec/store/StorageEngineRegistry.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/622aad0f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index bd4efcd..4cc7346 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -28,12 +28,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.util.PathScanner;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.server.DrillbitContext;
public class StorageEngineRegistry {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
-
+
private Map<Object, Constructor<? extends StorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends StorageEngine>>();
private Map<StorageEngineConfig, StorageEngine> activeEngines = new HashMap<StorageEngineConfig, StorageEngine>();
@@ -42,7 +41,7 @@ public class StorageEngineRegistry {
init(context.getConfig());
this.context = context;
}
-
+
@SuppressWarnings("unchecked")
public void init(DrillConfig config){
Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
@@ -63,21 +62,23 @@ public class StorageEngineRegistry {
}
}
}
-
- public StorageEngine getEngine(StorageEngineConfig engineConfig) throws ExecutionSetupException{
+
+ public synchronized StorageEngine getEngine(StorageEngineConfig engineConfig) throws ExecutionSetupException{
StorageEngine engine = activeEngines.get(engineConfig);
if(engine != null) return engine;
Constructor<? extends StorageEngine> c = availableEngines.get(engineConfig.getClass());
if(c == null) throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
try {
- return c.newInstance(engineConfig, context);
+ engine = c.newInstance(engineConfig, context);
+ activeEngines.put(engineConfig, engine);
+ return engine;
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
if(t instanceof ExecutionSetupException) throw ((ExecutionSetupException) t);
throw new ExecutionSetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
}
}
-
-
+
+
}
[03/10] git commit: DRILL-312: Modularize
org.apache.drill.exec.physical.impl.ImplCreator using operator creator
registry
Posted by ja...@apache.org.
DRILL-312: Modularize org.apache.drill.exec.physical.impl.ImplCreator using operator creator registry
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b91f2e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b91f2e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b91f2e8a
Branch: refs/heads/master
Commit: b91f2e8a837d7079c305442b41a3f3ef20b9846f
Parents: ba5e652
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:37:40 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:37:40 2013 -0800
----------------------------------------------------------------------
.../exceptions/ExecutionSetupException.java | 10 ++
.../drill/exec/physical/impl/ImplCreator.java | 175 ++-----------------
.../physical/impl/OperatorCreatorRegistry.java | 100 +++++++++++
.../drill/exec/server/DrillbitContext.java | 17 +-
.../apache/drill/exec/client/DumpCatTest.java | 6 +-
.../exec/fn/impl/TestRepeatedFunction.java | 2 +
.../physical/impl/TestComparisonFunctions.java | 1 +
.../exec/physical/impl/TestSimpleFunctions.java | 5 +
.../drill/exec/physical/impl/agg/TestAgg.java | 2 +
.../physical/impl/filter/TestSimpleFilter.java | 3 +
.../exec/physical/impl/join/TestMergeJoin.java | 5 +
.../physical/impl/limit/TestSimpleLimit.java | 6 +
.../impl/project/TestSimpleProjection.java | 2 +
.../exec/physical/impl/sort/TestSimpleSort.java | 3 +
.../physical/impl/svremover/TestSVRemover.java | 2 +
.../impl/trace/TestTraceMultiRecordBatch.java | 4 +-
.../impl/trace/TestTraceOutputDump.java | 4 +-
.../physical/impl/union/TestSimpleUnion.java | 4 +
18 files changed, 177 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java b/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
index 2e50ae5..ae647f1 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ExecutionSetupException.java
@@ -17,9 +17,19 @@
*/
package org.apache.drill.common.exceptions;
+import java.lang.reflect.InvocationTargetException;
+
public class ExecutionSetupException extends DrillException{
+ private static final long serialVersionUID = -6943409010231014085L;
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionSetupException.class);
+
+ public static ExecutionSetupException fromThrowable(String message, Throwable cause) {
+ Throwable t = cause instanceof InvocationTargetException
+ ? ((InvocationTargetException)cause).getTargetException() : cause;
+ if(t instanceof ExecutionSetupException) return ((ExecutionSetupException) t);
+ return new ExecutionSetupException(message, t);
+ }
public ExecutionSetupException() {
super();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 3e4c1eb..2dc5f16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl;
-import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -25,43 +24,8 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.IteratorValidator;
-import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.config.MergingReceiverPOP;
-import org.apache.drill.exec.physical.config.OrderedPartitionSender;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SelectionVectorRemover;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.Trace;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
-import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
-import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
-import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionSenderCreator;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
-import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
-import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
-import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
-import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
-import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
-import org.apache.drill.exec.physical.impl.validate.IteratorValidatorCreator;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.json.JSONScanBatchCreator;
-import org.apache.drill.exec.store.json.JSONSubScan;
-import org.apache.drill.exec.store.mock.MockScanBatchCreator;
-import org.apache.drill.exec.store.mock.MockSubScanPOP;
-import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -69,138 +33,38 @@ import com.google.common.collect.Lists;
/**
* Implementation of the physical operator visitor
*/
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
- // Element creators supported by this visitor
- private MockScanBatchCreator msc = new MockScanBatchCreator();
- private ParquetScanBatchCreator parquetScan = new ParquetScanBatchCreator();
- private ScreenCreator sc = new ScreenCreator();
- private MergingReceiverCreator mrc = new MergingReceiverCreator();
- private RandomReceiverCreator rrc = new RandomReceiverCreator();
- private PartitionSenderCreator hsc = new PartitionSenderCreator();
- private OrderedPartitionSenderCreator opsc = new OrderedPartitionSenderCreator();
- private SingleSenderCreator ssc = new SingleSenderCreator();
- private ProjectBatchCreator pbc = new ProjectBatchCreator();
- private FilterBatchCreator fbc = new FilterBatchCreator();
- private LimitBatchCreator lbc = new LimitBatchCreator();
- private UnionBatchCreator unionbc = new UnionBatchCreator();
- private SVRemoverCreator svc = new SVRemoverCreator();
- private SortBatchCreator sbc = new SortBatchCreator();
- private AggBatchCreator abc = new AggBatchCreator();
- private MergeJoinCreator mjc = new MergeJoinCreator();
- private IteratorValidatorCreator ivc = new IteratorValidatorCreator();
private RootExec root = null;
- private TraceBatchCreator tbc = new TraceBatchCreator();
private ImplCreator(){}
- public RootExec getRoot(){
+ private RootExec getRoot(){
return root;
}
@Override
- public RecordBatch visitProject(Project op, FragmentContext context) throws ExecutionSetupException {
- return pbc.getBatch(context, op, getChildren(op, context));
- }
-
- @Override
- public RecordBatch visitTrace(Trace op, FragmentContext context) throws ExecutionSetupException {
- return tbc.getBatch(context, op, getChildren(op, context));
- }
- @Override
- public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
- Preconditions.checkNotNull(subScan);
+ @SuppressWarnings("unchecked")
+ public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
+ Preconditions.checkNotNull(op);
Preconditions.checkNotNull(context);
- if (subScan instanceof MockSubScanPOP) {
- return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
- } else if (subScan instanceof JSONSubScan) {
- return new JSONScanBatchCreator().getBatch(context, (JSONSubScan) subScan, Collections.<RecordBatch> emptyList());
- } else if (subScan instanceof ParquetRowGroupScan) {
- return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
- } else {
- return super.visitSubScan(subScan, context);
- }
-
- }
-
- @Override
- public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
- if (op instanceof SelectionVectorRemover) {
- return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
+ Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(op.getClass());
+ if (opCreator != null) {
+ if (op instanceof FragmentRoot ) {
+ root = ((RootCreator<PhysicalOperator>)opCreator).getRoot(context, op, getChildren(op, context));
+ return null;
+ } else {
+ return ((BatchCreator<PhysicalOperator>)opCreator).getBatch(context, op, getChildren(op, context));
+ }
} else {
- return super.visitOp(op, context);
+ throw new UnsupportedOperationException(String.format(
+ "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.",
+ this.getClass().getCanonicalName(), op.getClass().getCanonicalName()));
}
}
- @Override
- public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
- return sbc.getBatch(context, sort, getChildren(sort, context));
- }
-
- @Override
- public RecordBatch visitLimit(Limit limit, FragmentContext context) throws ExecutionSetupException {
- return lbc.getBatch(context, limit, getChildren(limit, context));
- }
-
- @Override
- public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
- return mjc.getBatch(context, op, getChildren(op, context));
- }
-
- @Override
- public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
- Preconditions.checkArgument(root == null);
- root = sc.getRoot(context, op, getChildren(op, context));
- return null;
- }
-
- @Override
- public RecordBatch visitHashPartitionSender(HashPartitionSender op, FragmentContext context) throws ExecutionSetupException {
- root = hsc.getRoot(context, op, getChildren(op, context));
- return null;
- }
-
- @Override
- public RecordBatch visitOrderedPartitionSender(OrderedPartitionSender op, FragmentContext context) throws ExecutionSetupException {
- root = opsc.getRoot(context, op, getChildren(op, context));
- return null;
- }
-
- @Override
- public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
- return fbc.getBatch(context, filter, getChildren(filter, context));
- }
-
-
- @Override
- public RecordBatch visitStreamingAggregate(StreamingAggregate config, FragmentContext context)
- throws ExecutionSetupException {
- return abc.getBatch(context, config, getChildren(config, context));
- }
-
- @Override
- public RecordBatch visitUnion(Union union, FragmentContext context) throws ExecutionSetupException {
- return unionbc.getBatch(context, union, getChildren(union, context));
- }
-
- @Override
- public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
- root = ssc.getRoot(context, op, getChildren(op, context));
- return null;
- }
-
- @Override
- public RecordBatch visitRandomReceiver(RandomReceiver op, FragmentContext context) throws ExecutionSetupException {
- return rrc.getBatch(context, op, null);
- }
-
- @Override
- public RecordBatch visitMergingReceiver(MergingReceiverPOP op, FragmentContext context) throws ExecutionSetupException {
- return mrc.getBatch(context, op, null);
- }
-
private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
List<RecordBatch> children = Lists.newArrayList();
for (PhysicalOperator child : op) {
@@ -209,11 +73,6 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
return children;
}
- @Override
- public RecordBatch visitIteratorValidator(IteratorValidator op, FragmentContext context) throws ExecutionSetupException {
- return ivc.getBatch(context, op, getChildren(op, context));
- }
-
public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
ImplCreator i = new ImplCreator();
boolean isAssertEnabled = false;
@@ -227,6 +86,4 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
"The provided fragment did not have a root node that correctly created a RootExec value.");
return i.getRoot();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
new file mode 100644
index 0000000..8c768e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.PathScanner;
+
+public class OperatorCreatorRegistry {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCreatorRegistry.class);
+
+ private volatile Map<Class<?>, Constructor<?>> constructorRegistry = new HashMap<Class<?>, Constructor<?>>();
+ private volatile Map<Class<?>, Object> instanceRegistry = new HashMap<Class<?>, Object>();
+
+ public OperatorCreatorRegistry(DrillConfig config) {
+ addImplemntorsToMap(config, BatchCreator.class);
+ addImplemntorsToMap(config, RootCreator.class);
+ logger.debug("Adding Operator Creator map: {}", constructorRegistry);
+ }
+
+ public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException {
+ Object opCreator = instanceRegistry.get(operator);
+ if (opCreator != null) return opCreator;
+
+ Constructor<?> c = constructorRegistry.get(operator);
+ if(c == null) {
+ throw new ExecutionSetupException(
+ String.format("Failure finding OperatorCreator constructor for config %s", operator.getCanonicalName()));
+ }
+ try {
+ opCreator = c.newInstance();
+ instanceRegistry.put(operator, opCreator);
+ return opCreator;
+ } catch (Throwable t) {
+ throw ExecutionSetupException.fromThrowable(
+ String.format("Failure creating OperatorCreator for Operator %s", operator.getCanonicalName()), t);
+ }
+ }
+
+ private <T> void addImplemntorsToMap(DrillConfig config, Class<T> baseInterface) {
+ Class<?>[] providerClasses = PathScanner.scanForImplementationsArr(baseInterface,
+ config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
+ for (Class<?> c : providerClasses) {
+ Class<?> operatorClass = c;
+ boolean interfaceFound = false;
+ while (!interfaceFound && !(c.equals(java.lang.Object.class))) {
+ Type[] ifaces = c.getGenericInterfaces(); // never returns null
+ for (Type iface : ifaces) {
+ if (!(iface instanceof ParameterizedType
+ && ((ParameterizedType)iface).getRawType().equals(baseInterface))) {
+ continue;
+ }
+ Type[] args = ((ParameterizedType)iface).getActualTypeArguments();
+ interfaceFound = true;
+ boolean constructorFound = false;
+ for(Constructor<?> constructor : operatorClass.getConstructors()){
+ Class<?>[] params = constructor.getParameterTypes();
+ if(params.length == 0){
+ Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor);
+ if (old != null) {
+ throw new RuntimeException(
+ String.format("Duplicate OperatorCreator [%s, %s] found for PhysicalOperator %s",
+ old.getDeclaringClass().getCanonicalName(), operatorClass.getCanonicalName(),
+ ((Class<?>) args[0]).getCanonicalName()));
+ }
+ constructorFound = true;
+ }
+ }
+ if(!constructorFound){
+ logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor",
+ operatorClass.getCanonicalName());
+ }
+ }
+ c = c.getSuperclass();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 080fd70..4656590 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -24,21 +24,18 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.StorageEngineRegistry;
-import com.google.common.base.Preconditions;
import com.codahale.metrics.MetricRegistry;
-
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import com.google.common.base.Preconditions;
public class DrillbitContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -51,6 +48,7 @@ public class DrillbitContext {
private final DistributedCache cache;
private final DrillbitEndpoint endpoint;
private final StorageEngineRegistry storageEngineRegistry;
+ private final OperatorCreatorRegistry operatorCreatorRegistry;
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
super();
@@ -65,6 +63,7 @@ public class DrillbitContext {
this.endpoint = endpoint;
this.storageEngineRegistry = new StorageEngineRegistry(this);
this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
+ this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
}
public DrillbitEndpoint getEndpoint(){
@@ -82,7 +81,11 @@ public class DrillbitContext {
public BufferAllocator getAllocator(){
return context.getAllocator();
}
-
+
+ public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+ return operatorCreatorRegistry;
+ }
+
public StorageEngine getStorageEngine(StorageEngineConfig config) throws ExecutionSetupException {
return storageEngineRegistry.getEngine(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index a78ffc3..0ecab3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -27,24 +27,21 @@ import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecConstants;
-
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
-
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
-
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
@@ -71,6 +68,7 @@ public class DumpCatTest {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
index 3ec9492..aa67ba5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -59,6 +60,7 @@ public class TestRepeatedFunction {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 54bf0fd..4dea33d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -56,6 +56,7 @@ public class TestComparisonFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index 1776d8d..411f21c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -59,6 +59,7 @@ public class TestSimpleFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -86,6 +87,7 @@ public class TestSimpleFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -113,6 +115,7 @@ public class TestSimpleFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -152,6 +155,7 @@ public class TestSimpleFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -191,6 +195,7 @@ public class TestSimpleFunctions {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
index b18ef71..4eae66f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -54,6 +55,7 @@ public class TestAgg {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index 14d68f3..d1c756f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -56,6 +57,7 @@ public class TestSimpleFilter {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
@@ -80,6 +82,7 @@ public class TestSimpleFilter {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 09b7ebe..41cb034 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -63,6 +64,7 @@ public class TestMergeJoin {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -112,6 +114,7 @@ public class TestMergeJoin {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
@@ -164,6 +167,7 @@ public class TestMergeJoin {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
@@ -216,6 +220,7 @@ public class TestMergeJoin {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 89d909d..b254fc0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -20,9 +20,11 @@ package org.apache.drill.exec.physical.impl.limit;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.codahale.metrics.MetricRegistry;
+
import junit.framework.Assert;
import mockit.Injectable;
import mockit.NonStrictExpectations;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -30,6 +32,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -50,6 +53,7 @@ public class TestSimpleLimit {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
verifyLimitCount(bitContext, connection, "test1.json", 5);
@@ -60,6 +64,7 @@ public class TestSimpleLimit {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
verifyLimitCount(bitContext, connection, "test3.json", 95);
@@ -70,6 +75,7 @@ public class TestSimpleLimit {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
verifyLimitCount(bitContext, connection, "test2.json", 69999);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index ca38d9c..72dbe6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -59,6 +60,7 @@ public class TestSimpleProjection {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
index 426aa3a..4472668 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -58,6 +59,7 @@ public class TestSimpleSort {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
@@ -105,6 +107,7 @@ public class TestSimpleSort {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index 2da96d7..5d2b67c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -58,6 +59,7 @@ public class TestSVRemover {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index dd7d006..a3d923e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.trace;
import static org.junit.Assert.*;
-
import mockit.Injectable;
import mockit.NonStrictExpectations;
@@ -30,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -37,7 +37,6 @@ import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
-
import org.junit.Test;
import com.google.common.base.Charsets;
@@ -65,6 +64,7 @@ public class TestTraceMultiRecordBatch {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index f4e6180..7643cf8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.trace;
import static org.junit.Assert.*;
-
import mockit.Injectable;
import mockit.NonStrictExpectations;
@@ -31,6 +30,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -40,7 +40,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -79,6 +78,7 @@ public class TestTraceOutputDump {
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
bitContext.getConfig(); result = c;
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b91f2e8a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index f74c66f..fe90ad4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.physical.impl.union;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.codahale.metrics.MetricRegistry;
+
import mockit.Injectable;
import mockit.NonStrictExpectations;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -29,6 +31,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -54,6 +57,7 @@ public class TestSimpleUnion {
new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
}};
[02/10] git commit: DRILL-311: Replace OrderedPartitionBatchCreator
with OrderedPartitionSenderCreator
Posted by ja...@apache.org.
DRILL-311: Replace OrderedPartitionBatchCreator with OrderedPartitionSenderCreator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ba5e6520
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ba5e6520
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ba5e6520
Branch: refs/heads/master
Commit: ba5e65207bac38519bc199ed95535932abab2908
Parents: 622aad0
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun Dec 1 19:35:46 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:35:46 2013 -0800
----------------------------------------------------------------------
.../drill/exec/physical/impl/ImplCreator.java | 27 +++++++----
.../OrderedPartitionBatchCreator.java | 39 ----------------
.../OrderedPartitionSenderCreator.java | 47 ++++++++++++++++++++
.../PartitionSenderRootExec.java | 2 +-
4 files changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index efc0f5b..3e4c1eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,12 +26,27 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionBatchCreator;
+import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionSenderCreator;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -64,10 +79,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private MergingReceiverCreator mrc = new MergingReceiverCreator();
private RandomReceiverCreator rrc = new RandomReceiverCreator();
private PartitionSenderCreator hsc = new PartitionSenderCreator();
- private OrderedPartitionBatchCreator opc = new OrderedPartitionBatchCreator();
+ private OrderedPartitionSenderCreator opsc = new OrderedPartitionSenderCreator();
private SingleSenderCreator ssc = new SingleSenderCreator();
private ProjectBatchCreator pbc = new ProjectBatchCreator();
- private OrderedPartitionBatchCreator smplbc = new OrderedPartitionBatchCreator();
private FilterBatchCreator fbc = new FilterBatchCreator();
private LimitBatchCreator lbc = new LimitBatchCreator();
private UnionBatchCreator unionbc = new UnionBatchCreator();
@@ -150,10 +164,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
@Override
public RecordBatch visitOrderedPartitionSender(OrderedPartitionSender op, FragmentContext context) throws ExecutionSetupException {
- List<RecordBatch> children = Lists.newArrayList();
- children.add(opc.getBatch(context, op, getChildren(op, context)));
- HashPartitionSender config = new HashPartitionSender(op.getOppositeMajorFragmentId(), op, op.getRef(),op.getDestinations());
- root = hsc.getRoot(context, config, children);
+ root = opsc.getRoot(context, op, getChildren(op, context));
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
deleted file mode 100644
index 615cf21..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.orderedpartitioner;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.OrderedPartitionSender;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import java.util.List;
-
-public class OrderedPartitionBatchCreator implements BatchCreator<OrderedPartitionSender>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.size() == 1);
- return new OrderedPartitionRecordBatch(config, children.iterator().next(), context);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
new file mode 100644
index 0000000..c0ba8f9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.orderedpartitioner;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.impl.RootCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> {
+
+ @Override
+ public RootExec getRoot(FragmentContext context, OrderedPartitionSender config,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 1);
+
+ List<RecordBatch> ordered_children = Lists.newArrayList();
+ ordered_children.add(new OrderedPartitionRecordBatch(config, children.iterator().next(), context));
+ HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations());
+ return new PartitionSenderRootExec(context, ordered_children.iterator().next(), hpc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 19adee7..bc53bd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -45,7 +45,7 @@ import com.sun.codemodel.JMod;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
-class PartitionSenderRootExec implements RootExec {
+public class PartitionSenderRootExec implements RootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
private RecordBatch incoming;
[05/10] DRILL-274: Spooling batch buffer
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json b/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
new file mode 100644
index 0000000..143331d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/work/batch/multiple_exchange.json
@@ -0,0 +1,48 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 1000000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "union-exchange"
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "union-exchange"
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "union-exchange"
+ },
+ {
+ @id: 5,
+ child: 4,
+ pop: "filter",
+ expr: "red < 9000000000000000000"
+ },
+ {
+ @id: 6,
+ child: 5,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
[10/10] git commit: DRILL-293: Sort operator throws NPE on empty
record batch
Posted by ja...@apache.org.
DRILL-293: Sort operator throws NPE on empty record batch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4a83dae3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4a83dae3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4a83dae3
Branch: refs/heads/master
Commit: 4a83dae35f8782a9737b31723769724f3b274894
Parents: 366bf8e
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:11:27 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:11:27 2013 -0800
----------------------------------------------------------------------
.../org/apache/drill/exec/physical/impl/sort/SortBatch.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a83dae3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 9cefce1..7881115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -134,10 +134,11 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
}
- if (builder == null)
+ if (builder == null){
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
-
+ }
+
builder.build(context);
sv4 = builder.getSv4();
[07/10] git commit: DRILL-303: RecordBatchLoader.load always creates
new schema
Posted by ja...@apache.org.
DRILL-303: RecordBatchLoader.load always creates new schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b12c0b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b12c0b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b12c0b15
Branch: refs/heads/master
Commit: b12c0b155698a3d2ecd5cf3bfdf994fccd65f8d6
Parents: 2c811a8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:06:05 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:06:05 2013 -0800
----------------------------------------------------------------------
.../drill/exec/record/RecordBatchLoader.java | 17 +++---
.../exec/physical/impl/TestUnionExchange.java | 62 ++++++++++++++++++++
.../test/resources/sender/union_exchange.json | 47 +++++++++++++++
3 files changed, 116 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 016f340..f19184f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -62,10 +62,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
this.valueCount = def.getRecordCount();
boolean schemaChanged = schema == null;
- Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
+ Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
for(VectorWrapper<?> w : container){
ValueVector v = w.getValueVector();
- oldFields.put(v.getField(), v);
+ oldFields.put(v.getField().getDef(), v);
}
VectorContainer newVectors = new VectorContainer();
@@ -76,15 +76,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
for (FieldMetadata fmd : fields) {
FieldDef fieldDef = fmd.getDef();
ValueVector v = oldFields.remove(fieldDef);
- if(v != null){
- container.add(v);
- continue;
+ if(v == null) {
+ // if we arrive here, we didn't have a matching vector.
+ schemaChanged = true;
+ MaterializedField m = new MaterializedField(fieldDef);
+ v = TypeHelper.getNewVector(m, allocator);
}
-
- // if we arrive here, we didn't have a matching vector.
- schemaChanged = true;
- MaterializedField m = new MaterializedField(fieldDef);
- v = TypeHelper.getNewVector(m, allocator);
if (fmd.getValueCount() == 0){
v.clear();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
new file mode 100644
index 0000000..2e16b47
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestUnionExchange extends PopUnitTestBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionExchange.class);
+
+ @Test
+ public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+ Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) {
+
+ bit1.run();
+ bit2.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/sender/union_exchange.json"),
+ Charsets.UTF_8));
+ int count = 0;
+ for(QueryResultBatch b : results) {
+ if (b.getHeader().getRowCount() != 0)
+ count += b.getHeader().getRowCount();
+ }
+ assertEquals(150, count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/resources/sender/union_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/union_exchange.json b/exec/java-exec/src/test/resources/sender/union_exchange.json
new file mode 100644
index 0000000..76963bd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/union_exchange.json
@@ -0,0 +1,47 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"}
+ ]},
+ {records: 200, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "union-exchange"
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "filter",
+ expr: "alternate()"
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop:"selection-vector-remover"
+ },
+ {
+ @id: 5,
+ child: 4,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
[08/10] git commit: DRILL-301: Join two tables hit
IndexOutOfBoundsException
Posted by ja...@apache.org.
DRILL-301: Join two tables hit IndexOutOfBoundsException
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/316ce8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/316ce8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/316ce8a6
Branch: refs/heads/master
Commit: 316ce8a6f8f94c31574e7107be26addcfc92dc7f
Parents: b12c0b1
Author: Ben Becker <be...@gmail.com>
Authored: Sun Dec 1 20:07:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:07:45 2013 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/JoinTemplate.java | 28 +++++++++++---------
.../exec/physical/impl/join/JoinWorker.java | 2 +-
.../exec/physical/impl/join/MergeJoinBatch.java | 20 ++++++++------
3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index b7fdbf3..aae1a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -83,8 +83,9 @@ public abstract class JoinTemplate implements JoinWorker {
/**
* Copy rows from the input record batches until the output record batch is full
* @param status State of the join operation (persists across multiple record batches/schema changes)
+ * @return true of join succeeded; false if the worker needs to be regenerated
*/
- public final void doJoin(final JoinStatus status) {
+ public final boolean doJoin(final JoinStatus status) {
while (true) {
// for each record
@@ -93,14 +94,15 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
- doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+ if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
+ return false;
status.advanceLeft();
}
}
- return;
+ return true;
}
if (!status.isLeftPositionAllowed())
- return;
+ return true;
int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
switch (comparison) {
@@ -108,7 +110,9 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
- doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+ if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
+ return false;
+ }
status.advanceLeft();
continue;
@@ -133,10 +137,10 @@ public abstract class JoinTemplate implements JoinWorker {
do {
// copy all equal right keys to the output record batch
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
- return;
+ return false;
if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
- return;
+ return false;
// If the left key has duplicates and we're about to cross a boundary in the right batch, add the
// right table's record batch to the sv4 builder before calling next. These records will need to be
@@ -167,7 +171,7 @@ public abstract class JoinTemplate implements JoinWorker {
status.ok = false;
}
// return to indicate recompile in right-sv4 mode
- return;
+ return true;
}
continue;
@@ -193,8 +197,8 @@ public abstract class JoinTemplate implements JoinWorker {
/**
* Copy the data to the new record batch (if it fits).
*
- * @param leftPosition position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
- * @param outputPosition position of the output record batch
+ * @param leftIndex position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
+ * @param outIndex position of the output record batch
* @return Whether or not the data was copied.
*/
public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
@@ -205,8 +209,8 @@ public abstract class JoinTemplate implements JoinWorker {
* Compare the values of the left and right join key to determine whether the left is less than, greater than
* or equal to the right.
*
- * @param leftPosition
- * @param rightPosition
+ * @param leftIndex
+ * @param rightIndex
* @return 0 if both keys are equal
* -1 if left is < right
* 1 if left is > right
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 4374cef..8643d66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -30,7 +30,7 @@ public interface JoinWorker {
}
public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
- public void doJoin(JoinStatus status);
+ public boolean doJoin(JoinStatus status);
public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/316ce8a6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1e20e91..f3a32cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -166,7 +166,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
// join until we have a complete outgoing batch
- worker.doJoin(status);
+ if (!worker.doJoin(status))
+ worker = null;
// get the outcome of the join.
switch(status.getOutcome()){
@@ -353,10 +354,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(vw.getField().getType(),vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
+ cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
.arg(COPY_LEFT_MAPPING.getValueReadIndex())
.arg(COPY_LEFT_MAPPING.getValueWriteIndex())
- .arg(vvIn));
+ .arg(vvIn).eq(JExpr.FALSE))
+ ._then()
+ ._return(JExpr.FALSE);
++vectorId;
}
cg.getEvalBlock()._return(JExpr.lit(true));
@@ -372,10 +375,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
new TypedFieldId(vw.getField().getType(),vectorId));
// todo: check result of copyFromSafe and grow allocation
- cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
- .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
- .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
- .arg(vvIn));
+ cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+ .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+ .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+ .arg(vvIn).eq(JExpr.FALSE))
+ ._then()
+ ._return(JExpr.FALSE);
++vectorId;
}
cg.getEvalBlock()._return(JExpr.lit(true));
@@ -388,7 +393,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private void allocateBatch() {
// allocate new batch space.
container.clear();
-
// add fields from both batches
for (VectorWrapper<?> w : left) {
ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
[06/10] git commit: DRILL-274: Spooling batch buffer
Posted by ja...@apache.org.
DRILL-274: Spooling batch buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2c811a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2c811a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2c811a83
Branch: refs/heads/master
Commit: 2c811a83b30295fb39e1540ce76fbf54768ed50c
Parents: 6c0389f
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 20:04:44 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:04:44 2013 -0800
----------------------------------------------------------------------
distribution/src/resources/drill-override.conf | 35 ++-
.../org/apache/drill/exec/ExecConstants.java | 5 +
.../apache/drill/exec/ops/FragmentContext.java | 10 +-
.../exec/physical/impl/WireRecordBatch.java | 15 +-
.../impl/mergereceiver/MergingRecordBatch.java | 15 +-
.../exec/record/RawFragmentBatchProvider.java | 4 +-
.../exec/store/LocalSyncableFileSystem.java | 183 ++++++++++++++
.../work/batch/AbstractFragmentCollector.java | 22 +-
.../drill/exec/work/batch/BatchCollector.java | 4 +-
.../exec/work/batch/BitComHandlerImpl.java | 2 +-
.../drill/exec/work/batch/IncomingBuffers.java | 24 +-
.../drill/exec/work/batch/MergingCollector.java | 5 +-
.../exec/work/batch/PartitionedCollector.java | 5 +-
.../drill/exec/work/batch/RawBatchBuffer.java | 4 +-
.../exec/work/batch/SpoolingRawBatchBuffer.java | 250 +++++++++++++++++++
.../work/batch/UnlimitedRawBatchBuffer.java | 4 +
.../work/foreman/RunningFragmentManager.java | 6 +-
.../work/fragment/RemoteFragmentHandler.java | 5 +-
.../src/main/resources/drill-module.conf | 11 +-
.../apache/drill/exec/client/DumpCatTest.java | 2 +-
.../exec/fn/impl/TestRepeatedFunction.java | 2 +-
.../physical/impl/TestComparisonFunctions.java | 2 +-
.../exec/physical/impl/TestOptiqPlans.java | 4 +-
.../exec/physical/impl/TestSimpleFunctions.java | 10 +-
.../drill/exec/physical/impl/agg/TestAgg.java | 2 +-
.../physical/impl/filter/TestSimpleFilter.java | 4 +-
.../exec/physical/impl/join/TestMergeJoin.java | 8 +-
.../physical/impl/limit/TestSimpleLimit.java | 4 +-
.../impl/project/TestSimpleProjection.java | 2 +-
.../exec/physical/impl/sort/TestSimpleSort.java | 4 +-
.../physical/impl/svremover/TestSVRemover.java | 2 +-
.../impl/trace/TestTraceMultiRecordBatch.java | 2 +-
.../impl/trace/TestTraceOutputDump.java | 2 +-
.../physical/impl/union/TestSimpleUnion.java | 2 +-
.../apache/drill/exec/work/batch/FileTest.java | 66 +++++
.../exec/work/batch/TestSpoolingBuffer.java | 62 +++++
.../src/test/resources/drill-module.conf | 4 +-
.../test/resources/drill-spool-test-module.conf | 83 ++++++
.../resources/work/batch/multiple_exchange.json | 48 ++++
39 files changed, 851 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index 18a2a6a..5fe2362 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -16,6 +16,9 @@
// This file tells Drill to consider this module when class path scanning.
// This file can also include any supplementary configuration information.
// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
drill.exec: {
cluster-id: "drillbits1"
rpc: {
@@ -46,11 +49,19 @@ drill.exec: {
optimizer: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
},
+ functions: ["org.apache.drill.expr.fn.impl"],
storage: {
- packages += "org.apache.drill.exec.store"
- }
- metrics : {
- context: "drillbit"
+ packages += "org.apache.drill.exec.store"
+ },
+ metrics : {
+ context: "drillbit",
+ jmx: {
+ enabled : true
+ },
+ log: {
+ enabled : false,
+ interval : 60
+ }
},
zk: {
connect: "localhost:2181",
@@ -60,7 +71,7 @@ drill.exec: {
retry: {
count: 7200,
delay: 500
- }
+ }
},
functions: ["org.apache.drill.expr.fn.impl"],
network: {
@@ -70,8 +81,18 @@ drill.exec: {
max.width.per.endpoint: 5,
global.max.width: 100,
executor.threads: 4
- }
+ },
trace: {
- directory: "/tmp"
+ directory: "/var/log/drill",
+ filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 100000000
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 36504f6..5336c0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -45,4 +45,9 @@ public interface ExecConstants {
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
+ public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
+ public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
+ public static final String SPOOLING_BUFFER_IMPL = "drill.exec.spooling.impl";
+ public static final String SPOOLING_BUFFER_DELETE = "drill.exec.spooling.delete";
+ public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.spooling.size";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 674dafc..f75cf5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
@@ -57,14 +58,14 @@ public class FragmentContext {
public final Timer fragmentTime;
private final FragmentHandle handle;
private final UserClientConnection connection;
- private final IncomingBuffers buffers;
+ private IncomingBuffers buffers;
private volatile Throwable failureCause;
private volatile boolean failed = false;
private final FunctionImplementationRegistry funcRegistry;
private final QueryClassLoader loader;
private final ClassTransformer transformer;
- public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers, FunctionImplementationRegistry funcRegistry) {
+ public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) {
this.loader = new QueryClassLoader(true);
this.transformer = new ClassTransformer();
this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
@@ -74,10 +75,13 @@ public class FragmentContext {
this.context = dbContext;
this.connection = connection;
this.handle = handle;
- this.buffers = buffers;
this.funcRegistry = funcRegistry;
}
+ public void setBuffers(IncomingBuffers buffers) {
+ this.buffers = buffers;
+ }
+
public void fail(Throwable cause) {
logger.debug("Fragment Context received failure. {}", cause);
failed = true;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 0b0214a..6a16367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
@@ -96,14 +97,14 @@ public class WireRecordBatch implements RecordBatch{
@Override
public IterOutcome next() {
- RawFragmentBatch batch = fragProvider.getNext();
+ try{
+ RawFragmentBatch batch = fragProvider.getNext();
- // skip over empty batches. we do this since these are basically control messages.
- while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
- batch = fragProvider.getNext();
- }
+ // skip over empty batches. we do this since these are basically control messages.
+ while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
+ batch = fragProvider.getNext();
+ }
- try{
if (batch == null) return IterOutcome.NONE;
@@ -119,7 +120,7 @@ public class WireRecordBatch implements RecordBatch{
}else{
return IterOutcome.OK;
}
- }catch(SchemaChangeException ex){
+ }catch(SchemaChangeException | IOException ex){
context.fail(ex);
return IterOutcome.STOP;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 6d8a284..fd392a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -132,7 +132,13 @@ public class MergingRecordBatch implements RecordBatch {
// set up each (non-empty) incoming record batch
List<RawFragmentBatch> rawBatches = Lists.newArrayList();
for (RawFragmentBatchProvider provider : fragProviders) {
- RawFragmentBatch rawBatch = provider.getNext();
+ RawFragmentBatch rawBatch = null;
+ try {
+ rawBatch = provider.getNext();
+ } catch (IOException e) {
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
if (rawBatch.getHeader().getDef().getRecordCount() != 0)
rawBatches.add(rawBatch);
}
@@ -226,7 +232,12 @@ public class MergingRecordBatch implements RecordBatch {
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
- incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+ try {
+ incomingBatches[node.batchId] = fragProviders[node.batchId].getNext();
+ } catch (IOException e) {
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
if (incomingBatches[node.batchId].getHeader().getIsLastBatch() ||
incomingBatches[node.batchId].getHeader().getDef().getRecordCount() == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index 3390af9..6f5f7a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -19,9 +19,11 @@ package org.apache.drill.exec.record;
import org.apache.drill.exec.ops.FragmentContext;
+import java.io.IOException;
+
public interface RawFragmentBatchProvider {
- public RawFragmentBatch getNext();
+ public RawFragmentBatch getNext() throws IOException;
public void kill(FragmentContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
new file mode 100644
index 0000000..10a4dc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class provides a Syncable local extension of the hadoop FileSystem
+ */
+public class LocalSyncableFileSystem extends FileSystem {
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("drill-local:///");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ return new FSDataInputStream(new LocalInputStream(path));
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i2, long l, Progressable progressable) throws IOException {
+ return new FSDataOutputStream(new LocalSyncableOutputStream(path));
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
+ throw new IOException("Append is not supported in LocalSyncableFilesystem");
+ }
+
+ @Override
+ public boolean rename(Path path, Path path2) throws IOException {
+ throw new IOException("Rename not supported");
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ File file = new File(path.toString());
+ return file.delete();
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) throws IOException {
+ File file = new File(path.toString());
+ if (b) {
+ if (file.isDirectory()) {
+ FileUtils.deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ } else if (file.isDirectory()) {
+ throw new IOException("Cannot delete directory");
+ }
+ file.delete();
+ return true;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ throw new IOException("listStatus not supported");
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return null;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ return new File(path.toString()).mkdirs();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return null;
+ }
+
+ public class LocalSyncableOutputStream extends OutputStream implements Syncable {
+ private FileOutputStream fos;
+ private BufferedOutputStream output;
+
+ public LocalSyncableOutputStream(Path path) throws FileNotFoundException {
+ File dir = new File(path.getParent().toString());
+ if (!dir.exists()) {
+ boolean success = dir.mkdirs();
+ if (!success) {
+ throw new FileNotFoundException("failed to create parent directory");
+ }
+ }
+ fos = new FileOutputStream(new File(path.toString()));
+ output = new BufferedOutputStream(fos, 64*1024);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ output.flush();
+ fos.getFD().sync();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ output.write(b);
+ }
+ }
+
+ public class LocalInputStream extends InputStream implements Seekable, PositionedReadable {
+
+ private FileInputStream input;
+
+ public LocalInputStream(Path path) throws IOException {
+ input = new FileInputStream(path.toString());
+ }
+
+ @Override
+ public int read(long l, byte[] bytes, int i, int i2) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void readFully(long l, byte[] bytes, int i, int i2) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void readFully(long l, byte[] bytes) throws IOException {
+ throw new IOException("unsupported operation");
+ }
+
+ @Override
+ public void seek(long l) throws IOException {
+ input.reset();
+ input.skip(l);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ throw new IOException("getPos not supported");
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ throw new IOException("seekToNewSource not supported");
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ input.read(b);
+ return (int) b[0] & 0xFF;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index d58de2f..7023373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -17,10 +17,17 @@
*/
package org.apache.drill.exec.work.batch;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -38,7 +45,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
private final AtomicInteger parentAccounter;
private final AtomicInteger finishedStreams = new AtomicInteger();
- public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired) {
+ public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) {
Preconditions.checkArgument(minInputsRequired > 0);
Preconditions.checkNotNull(receiver);
Preconditions.checkNotNull(parentAccounter);
@@ -48,8 +55,15 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
this.remainders = new AtomicIntegerArray(incoming.size());
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
this.buffers = new RawBatchBuffer[minInputsRequired];
- for(int i = 0; i < buffers.length; i++){
- buffers[i] = new UnlimitedRawBatchBuffer();
+ try {
+ String bufferClassName = context.getConfig().getString(ExecConstants.SPOOLING_BUFFER_IMPL);
+ Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class);
+ for(int i = 0; i < buffers.length; i++) {
+ buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context);
+ }
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
+ NoSuchMethodException | ClassNotFoundException e) {
+ context.fail(e);
}
if (receiver.supportsOutOfOrderExchange()) {
this.remainingRequired = new AtomicInteger(1);
@@ -68,7 +82,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
public abstract void streamFinished(int minorFragmentId);
- public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) throws IOException {
boolean decremented = false;
if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
int rem = remainingRequired.decrementAndGet();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index 236b239..539393c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -21,10 +21,12 @@ package org.apache.drill.exec.work.batch;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import java.io.IOException;
+
interface BatchCollector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
- public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) throws IOException ;
public int getOppositeMajorFragmentId();
public RawBatchBuffer[] getBuffers();
public int getTotalIncomingFragments();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 5639851..5d20026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -111,7 +111,7 @@ public class BitComHandlerImpl implements BitComHandler {
@Override
public void startNewRemoteFragment(PlanFragment fragment){
logger.debug("Received remote fragment start instruction", fragment);
- FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
RemoteFragmentRunnerListener listener = new RemoteFragmentRunnerListener(context, tunnel);
try{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index c9e5608..992d9a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,11 +17,13 @@
*/
package org.apache.drill.exec.work.batch;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
@@ -41,12 +43,14 @@ public class IncomingBuffers {
private final AtomicInteger streamsRemaining = new AtomicInteger(0);
private final AtomicInteger remainingRequired = new AtomicInteger(0);
private final Map<Integer, BatchCollector> fragCounts;
+ private final FragmentContext context;
- public IncomingBuffers(PhysicalOperator root) {
+ public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
+ this.context = context;
Map<Integer, BatchCollector> counts = Maps.newHashMap();
CountRequiredFragments reqFrags = new CountRequiredFragments();
root.accept(reqFrags, counts);
-
+
logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), counts);
fragCounts = ImmutableMap.copyOf(counts);
@@ -68,10 +72,14 @@ public class IncomingBuffers {
int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d.", sendMajorFragmentId));
- boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
-
- // we should only return true if remaining required has been decremented and is currently equal to zero.
- return decremented && remainingRequired.get() == 0;
+ try {
+ boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
+
+ // we should only return true if remaining required has been decremented and is currently equal to zero.
+ return decremented && remainingRequired.get() == 0;
+ } catch (IOException e) {
+ throw new FragmentSetupException(e);
+ }
}
public int getRemainingRequired() {
@@ -94,9 +102,9 @@ public class IncomingBuffers {
public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
BatchCollector set;
if (receiver.supportsOutOfOrderExchange()) {
- set = new MergingCollector(remainingRequired, receiver);
+ set = new MergingCollector(remainingRequired, receiver, context);
} else {
- set = new PartitionedCollector(remainingRequired, receiver);
+ set = new PartitionedCollector(remainingRequired, receiver, context);
}
counts.put(set.getOppositeMajorFragmentId(), set);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 670347c..1c92bbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -19,14 +19,15 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
public class MergingCollector extends AbstractFragmentCollector{
private AtomicInteger streamsRunning;
- public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
- super(parentAccounter, receiver, 1);
+ public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+ super(parentAccounter, receiver, 1, context);
streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index af12778..f998eff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -19,12 +19,13 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
public class PartitionedCollector extends AbstractFragmentCollector{
- public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver) {
- super(parentAccounter, receiver, receiver.getProvidingEndpoints().size());
+ public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
+ super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 82ed1ca..e7d3d06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -21,10 +21,12 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import java.io.IOException;
+
public interface RawBatchBuffer extends RawFragmentBatchProvider{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawBatchBuffer.class);
- public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch);
+ public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException;
/**
* Inform the buffer that no more records are expected.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
new file mode 100644
index 0000000..fa20b3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Queues;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This implementation of RawBatchBuffer starts writing incoming batches to disk once the buffer size reaches a threshold.
+ * The order of the incoming buffers is maintained.
+ */
+public class SpoolingRawBatchBuffer implements RawBatchBuffer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
+
+ private static String HADOOP_FILESYSTEM_DEFAULT_NAME = "fs.default.name";
+ private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
+ private static final float STOP_SPOOLING_FRACTION = (float) 0.5;
+
+ private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
+ private volatile boolean finished = false;
+ private volatile long queueSize = 0;
+ private long threshold;
+ private FragmentContext context;
+ private volatile AtomicBoolean spooling = new AtomicBoolean(false);
+ private FileSystem fs;
+ private Path path;
+ private FSDataOutputStream outputStream;
+ private FSDataInputStream inputStream;
+
+ public SpoolingRawBatchBuffer(FragmentContext context) throws IOException {
+ this.context = context;
+ this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_FILESYSTEM_DEFAULT_NAME, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
+ conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
+ this.fs = FileSystem.get(conf);
+ this.path = new Path(getDir(), getFileName());
+ }
+
+ public static List<String> DIRS = DrillConfig.create().getStringList(ExecConstants.TEMP_DIRECTORIES);
+
+ public static String getDir() {
+ Random random = new Random();
+ return DIRS.get(random.nextInt(DIRS.size()));
+ }
+
+ @Override
+ public synchronized void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) throws IOException {
+ RawFragmentBatchWrapper wrapper;
+ boolean spool = spooling.get();
+ wrapper = new RawFragmentBatchWrapper(batch, !spool);
+ queueSize += wrapper.getBodySize();
+ if (spool) {
+ if (outputStream == null) {
+ outputStream = fs.create(path);
+ }
+ wrapper.writeToStream(outputStream);
+ }
+ buffer.add(wrapper);
+ if (!spool && queueSize > threshold) {
+ logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", queueSize, threshold);
+ spooling.set(true);
+ }
+ }
+
+ @Override
+ public void kill(FragmentContext context) {
+ cleanup();
+ }
+
+
+ @Override
+ public void finished() {
+ finished = true;
+ }
+
+ @Override
+ public RawFragmentBatch getNext() throws IOException {
+ boolean spool = spooling.get();
+ RawFragmentBatchWrapper w = buffer.poll();
+ RawFragmentBatch batch;
+ if(w == null && !finished){
+ try {
+ w = buffer.take();
+ batch = w.get();
+ queueSize -= w.getBodySize();
+ return batch;
+ } catch (InterruptedException e) {
+ cleanup();
+ return null;
+ }
+ }
+ if (w == null) {
+ cleanup();
+ return null;
+ }
+
+ batch = w.get();
+ queueSize -= w.getBodySize();
+ assert queueSize >= 0;
+ if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
+ logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
+ spooling.set(false);
+ }
+ return batch;
+ }
+
+ private void cleanup() {
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException e) {
+ logger.warn("Failed to cleanup I/O streams", e);
+ }
+ if (context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
+ try {
+ fs.delete(path,false);
+ } catch (IOException e) {
+ logger.warn("Failed to delete temporary files", e);
+ }
+ logger.debug("Deleted file {}", path.toString());
+ }
+ }
+
+ private class RawFragmentBatchWrapper {
+ private RawFragmentBatch batch;
+ private boolean available;
+ private CountDownLatch latch = new CountDownLatch(1);
+ private int bodyLength;
+
+ public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
+ Preconditions.checkNotNull(batch);
+ this.batch = batch;
+ this.available = available;
+ }
+
+ public boolean isNull() {
+ return batch == null;
+ }
+
+ public RawFragmentBatch get() throws IOException {
+ if (available) {
+ return batch;
+ } else {
+ if (inputStream == null) {
+ inputStream = fs.open(path);
+ }
+ readFromStream(inputStream);
+ available = true;
+ return batch;
+ }
+ }
+
+ public long getBodySize() {
+ if (batch.getBody() == null) {
+ return 0;
+ }
+ assert batch.getBody().readableBytes() >= 0;
+ return batch.getBody().readableBytes();
+ }
+
+ public void writeToStream(FSDataOutputStream stream) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ available = false;
+ batch.getHeader().writeDelimitedTo(stream);
+ ByteBuf buf = batch.getBody();
+ if (buf == null) {
+ bodyLength = 0;
+ return;
+ }
+ bodyLength = buf.readableBytes();
+ buf.getBytes(0, stream, bodyLength);
+ stream.sync();
+ long t = watch.elapsed(TimeUnit.MICROSECONDS);
+ logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ buf.release();
+ }
+
+ public void readFromStream(FSDataInputStream stream) throws IOException {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ ExecProtos.FragmentRecordBatch header = ExecProtos.FragmentRecordBatch.parseDelimitedFrom(stream);
+ ByteBuf buf = context.getAllocator().buffer(bodyLength);
+ buf.writeBytes(stream, bodyLength);
+ batch = new RawFragmentBatch(header, buf);
+ available = true;
+ latch.countDown();
+ long t = watch.elapsed(TimeUnit.MICROSECONDS);
+ logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ }
+ }
+
+ private String getFileName() {
+ ExecProtos.FragmentHandle handle = context.getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String fileName = String.format("%s_%s_%s", qid, majorFragmentId, minorFragmentId);
+
+ return fileName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 43870da..64012c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -31,6 +31,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
private volatile boolean finished = false;
+
+ public UnlimitedRawBatchBuffer(FragmentContext context) {
+
+ }
@Override
public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 588316b..2cb57dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -78,9 +78,9 @@ class RunningFragmentManager implements FragmentStatusListener{
// set up the root fragment first so we'll have incoming buffers available.
{
- IncomingBuffers buffers = new IncomingBuffers(rootOperator);
-
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+ rootContext.setBuffers(buffers);
RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
// add fragment to local node.
map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index d947d68..6157229 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -56,8 +56,9 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
try{
this.fragment = fragment;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- this.buffers = new IncomingBuffers(root);
- this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
+ this.context = new FragmentContext(context, fragment.getHandle(), null, new FunctionImplementationRegistry(context.getConfig()));
+ this.buffers = new IncomingBuffers(root, this.context);
+ this.context.setBuffers(buffers);
this.runnerListener = new RemoteFragmentRunnerListener(this.context, foremanTunnel);
this.reader = context.getPlanReader();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 725c6b4..b84c406 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -66,9 +66,18 @@ drill.exec: {
max.width.per.endpoint: 5,
global.max.width: 100,
executor.threads: 4
- }
+ },
trace: {
directory: "/var/log/drill",
filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 100000000
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index 0ecab3a..c06818d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -74,7 +74,7 @@ public class DumpCatTest {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
index aa67ba5..8f56464 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
@@ -67,7 +67,7 @@ public class TestRepeatedFunction {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_repeated_1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
boolean oneIsOne = false;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 4dea33d..ea0ac2c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -62,7 +62,7 @@ public class TestComparisonFunctions {
String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression);
if(reader == null) reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
if(registry == null) registry = new FunctionImplementationRegistry(c);
- if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ if(context == null) context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
PhysicalPlan plan = reader.readPhysicalPlan(planString);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 05d57be..99ab362 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -99,7 +99,7 @@ public class TestOptiqPlans {
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext fctxt = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(fctxt, (FragmentRoot) pp.getSortedOperators(false).iterator().next()));
return exec;
@@ -278,7 +278,7 @@ public class TestOptiqPlans {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), reg);
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
return exec;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index 411f21c..d92d9fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -65,7 +65,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNull.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -93,7 +93,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testIsNotNull.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -121,7 +121,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstring.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -161,7 +161,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testSubstringNegative.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
@@ -201,7 +201,7 @@ public class TestSimpleFunctions {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/testByteSubstring.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
index 4eae66f..89f3292 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
@@ -61,7 +61,7 @@ public class TestAgg {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
return exec;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index d1c756f..5429bcf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -64,7 +64,7 @@ public class TestSimpleFilter {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
assertEquals(50, exec.getRecordCount());
@@ -88,7 +88,7 @@ public class TestSimpleFilter {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
while(exec.next()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 41cb034..0120c7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -70,7 +70,7 @@ public class TestMergeJoin {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -124,7 +124,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -177,7 +177,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
@@ -230,7 +230,7 @@ public class TestMergeJoin {
.replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
.replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int totalRecordCount = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 1ee9ceb..d82c0e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -95,7 +95,7 @@ public class TestSimpleLimit {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
while(exec.next()){
@@ -114,7 +114,7 @@ public class TestSimpleLimit {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int recordCount = 0;
long sum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index 72dbe6e..fa67e06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -67,7 +67,7 @@ public class TestSimpleProjection {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/project/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
index 4472668..a5837ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -66,7 +66,7 @@ public class TestSimpleSort {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/one_key_sort.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int previousInt = Integer.MIN_VALUE;
@@ -114,7 +114,7 @@ public class TestSimpleSort {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/two_key_sort.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int previousInt = Integer.MIN_VALUE;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index 5d2b67c..b53d1d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -66,7 +66,7 @@ public class TestSVRemover {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/remover/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
int count = exec.getRecordCount();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index a3d923e..11aae2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -70,7 +70,7 @@ public class TestTraceMultiRecordBatch {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/multi_record_batch_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index 7643cf8..10ce997 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -84,7 +84,7 @@ public class TestTraceOutputDump {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
index fe90ad4..836f177 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
@@ -64,7 +64,7 @@ public class TestSimpleUnion {
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/union/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
- FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
int[] counts = new int[]{100,50};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
new file mode 100644
index 0000000..4406e04
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/FileTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class FileTest {
+ public static void main(String[] args) throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "sync:///");
+ System.out.println(FileSystem.getDefaultUri(conf));
+ FileSystem fs = FileSystem.get(conf);
+// FileSystem fs = new LocalSyncableFileSystem(conf);
+ Path path = new Path("/tmp/testFile");
+ FSDataOutputStream out = fs.create(path);
+ byte[] s = "hello world".getBytes();
+ out.write(s);
+ out.sync();
+// out.close();
+ FSDataInputStream in = fs.open(path);
+ byte[] bytes = new byte[s.length];
+ in.read(bytes);
+ System.out.println(new String(bytes));
+ File file = new File("/tmp/testFile");
+ FileOutputStream fos = new FileOutputStream(file);
+ FileInputStream fis = new FileInputStream(file);
+ fos.write(s);
+ fos.getFD().sync();
+ fis.read(bytes);
+ System.out.println(new String(bytes));
+ out = fs.create(new Path("/tmp/file"));
+ for (int i = 0; i < 100; i++) {
+ bytes = new byte[256*1024];
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ out.write(bytes);
+ out.sync();
+ long t = watch.elapsed(TimeUnit.MILLISECONDS);
+ System.out.printf("Elapsed: %d. Rate %d.\n", t, (long) ((long) bytes.length * 1000L / t));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
new file mode 100644
index 0000000..cf5e128
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSpoolingBuffer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpoolingBuffer.class);
+
+ @Test
+ public void testMultipleExchangesSingleThread() throws Exception {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ DrillConfig conf = DrillConfig.create("drill-spool-test-module.conf");
+
+ try(Drillbit bit1 = new Drillbit(conf, serviceSet);
+ DrillClient client = new DrillClient(conf, serviceSet.getCoordinator());) {
+
+ bit1.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/work/batch/multiple_exchange.json"),
+ Charsets.UTF_8));
+ int count = 0;
+ for(QueryResultBatch b : results) {
+ if (b.getHeader().getRowCount() != 0)
+ count += b.getHeader().getRowCount();
+ }
+ assertEquals(500024, count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-module.conf b/exec/java-exec/src/test/resources/drill-module.conf
index 99dd863..5803610 100644
--- a/exec/java-exec/src/test/resources/drill-module.conf
+++ b/exec/java-exec/src/test/resources/drill-module.conf
@@ -56,8 +56,8 @@ drill.exec: {
start: 35000
},
work: {
- max.width.per.endpoint: 5,
+ max.width.per.endpoint: 1,
global.max.width: 100,
- executor.threads: 4
+ executor.threads: 1
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c811a83/exec/java-exec/src/test/resources/drill-spool-test-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-spool-test-module.conf b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
new file mode 100644
index 0000000..c20cc85
--- /dev/null
+++ b/exec/java-exec/src/test/resources/drill-spool-test-module.conf
@@ -0,0 +1,83 @@
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
+drill.exec: {
+ cluster-id: "drillbits1"
+ rpc: {
+ user: {
+ server: {
+ port: 31010
+ threads: 1
+ }
+ client: {
+ threads: 1
+ }
+ },
+ bit: {
+ server: {
+ port : 31011,
+ retry:{
+ count: 7200,
+ delay: 500
+ },
+ threads: 1
+ }
+ },
+ use.ip : false
+ },
+ operator: {
+ packages += "org.apache.drill.exec.physical.config"
+ },
+ optimizer: {
+ implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+ },
+ functions: ["org.apache.drill.expr.fn.impl"],
+ storage: {
+ packages += "org.apache.drill.exec.store"
+ },
+ metrics : {
+ context: "drillbit",
+ jmx: {
+ enabled : true
+ },
+ log: {
+ enabled : false,
+ interval : 60
+ }
+ },
+ zk: {
+ connect: "localhost:2181",
+ root: "/drill",
+ refresh: 500,
+ timeout: 5000,
+ retry: {
+ count: 7200,
+ delay: 500
+ }
+ },
+ functions: ["org.apache.drill.expr.fn.impl"],
+ network: {
+ start: 35000
+ },
+ work: {
+ max.width.per.endpoint: 1,
+ global.max.width: 100,
+ executor.threads: 1
+ },
+ trace: {
+ directory: "/var/log/drill",
+ filesystem: "file:///"
+ },
+ tmp: {
+ directories: ["/tmp/drill"],
+ filesystem: "drill-local:///"
+ },
+ spooling: {
+ impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
+ delete: false,
+ size: 0
+ }
+}
\ No newline at end of file
[09/10] git commit: DRILL-297: Trace operator throws NPE if the
configured path is not writable
Posted by ja...@apache.org.
DRILL-297: Trace operator throws NPE if the configured path is not writable
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/366bf8e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/366bf8e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/366bf8e4
Branch: refs/heads/master
Commit: 366bf8e4564333dd26b1f5672d4dd0fea28afacc
Parents: 316ce8a
Author: Mehant Baid <me...@github.com>
Authored: Sun Dec 1 20:09:04 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 20:09:04 2013 -0800
----------------------------------------------------------------------
.../apache/drill/exec/physical/impl/trace/TraceRecordBatch.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/366bf8e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 1b990c9..c138aba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -32,6 +32,7 @@ import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -79,7 +80,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
/* File descriptors needed to be able to dump to log file */
private OutputStream fos;
- public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+ public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(pop, context, incoming);
this.traceTag = pop.traceTag;
logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
@@ -95,7 +96,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
/* create the file */
fos = fs.create(new Path(fileName));
} catch (IOException e) {
- logger.error("Unable to create file: " + fileName);
+ throw new ExecutionSetupException("Unable to create file: " + fileName + " check permissions or if directory exists", e);
}
}
[04/10] git commit: DRILL-313: Fix for Limit operator only
transferring buffers on new schema
Posted by ja...@apache.org.
DRILL-313: Fix for Limit operator only transferring buffers on new schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c0389f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c0389f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c0389f3
Branch: refs/heads/master
Commit: 6c0389f394a789beb74103309f5bed13ddeccf95
Parents: b91f2e8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 19:48:57 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:48:57 2013 -0800
----------------------------------------------------------------------
.../physical/impl/limit/LimitRecordBatch.java | 8 ++--
.../drill/exec/fn/impl/GeneratorFunctions.java | 22 ++++++++-
.../physical/impl/limit/TestSimpleLimit.java | 35 ++++++++++++++
.../src/test/resources/limit/test4.json | 49 ++++++++++++++++++++
4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 8390997..712af9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -36,6 +36,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
private int recordsLeft;
private boolean noEndLimit;
private boolean skipBatch;
+ List<TransferPair> transfers = Lists.newArrayList();
public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, incoming);
@@ -52,7 +53,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
protected void setupNewSchema() throws SchemaChangeException {
container.clear();
- List<TransferPair> transfers = Lists.newArrayList();
for(VectorWrapper<?> v : incoming){
TransferPair pair = v.getValueVector().getTransferPair();
@@ -74,9 +74,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
- for(TransferPair tp : transfers) {
- tp.transfer();
- }
}
@Override
@@ -96,6 +93,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
protected void doWork() {
+ for(TransferPair tp : transfers) {
+ tp.transfer();
+ }
skipBatch = false;
int recordCount = incoming.getRecordCount();
if(recordCount <= recordsToSkip) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
index d12633e..b79ccd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.RecordBatch;
@@ -38,17 +39,36 @@ public class GeneratorFunctions {
OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt");
public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true),
OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8");
+ public static final FunctionDefinition INCREASING_BIGINT = FunctionDefinition.simple("increasingBigInt", new ArgumentValidators.NumericTypeAllowed(1, true),
+ OutputTypeDeterminer.FIXED_BIGINT, "increasingBigInt");
public static class Provider implements CallProvider {
@Override
public FunctionDefinition[] getFunctionDefintions() {
return new FunctionDefinition[] { RANDOM_BIG_INT,
- RANDOM_FLOAT8 };
+ RANDOM_FLOAT8,
+ INCREASING_BIGINT };
}
}
+ @FunctionTemplate(name = "increasingBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+ public static class IncreasingBigInt implements DrillSimpleFunc {
+
+ @Param BigIntHolder start;
+ @Workspace long current;
+ @Output BigIntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ current = 0;
+ }
+
+ public void eval() {
+ out.value = start.value + current++;
+ }
+ }
+
@FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
public static class RandomBigIntGauss implements DrillSimpleFunc {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index b254fc0..1ee9ceb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -38,8 +38,11 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -79,6 +82,13 @@ public class TestSimpleLimit {
}};
verifyLimitCount(bitContext, connection, "test2.json", 69999);
+ long start = 30000;
+ long end = 100000;
+ long expectedSum = (end - start) * (end + start - 1) / 2; //Formula for sum of series
+
+ verifySum(bitContext, connection, "test4.json", 70000, expectedSum);
+
+
}
private void verifyLimitCount(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount) throws Throwable {
@@ -99,4 +109,29 @@ public class TestSimpleLimit {
}
assertTrue(!context.isFailed());
}
+
+ private void verifySum(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount, long expectedSum) throws Throwable {
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+ int recordCount = 0;
+ long sum = 0;
+ while(exec.next()){
+ recordCount += exec.getRecordCount();
+ BigIntVector v = (BigIntVector) exec.iterator().next();
+ for (int i = 0; i < v.getAccessor().getValueCount(); i++) {
+ sum += v.getAccessor().get(i);
+ }
+ }
+
+ assertEquals(expectedCount, recordCount);
+ assertEquals(expectedSum, sum);
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/resources/limit/test4.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test4.json b/exec/java-exec/src/test/resources/limit/test4.json
new file mode 100644
index 0000000..b7793b1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test4.json
@@ -0,0 +1,49 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100000000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"project",
+ exprs: [
+ { ref: "col1", expr:"increasingBigInt(0)"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"limit",
+ first:30000,
+ last:100000
+ },
+ {
+ @id:4,
+ child:3,
+ pop: "selection-vector-remover"
+
+ },
+ {
+ @id: 5,
+ child: 4,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file