You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/11/14 00:14:56 UTC
[2/2] drill git commit: DRILL-5842: Refactor fragment,
operator contexts
DRILL-5842: Refactor fragment, operator contexts
This closes #978
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c56de2f1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c56de2f1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c56de2f1
Branch: refs/heads/master
Commit: c56de2f13a36d673f0c5836d44817137e64b91e4
Parents: 42fc11e
Author: Paul Rogers <pr...@maprtech.com>
Authored: Wed Oct 4 22:43:44 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Nov 13 14:55:39 2017 -0800
----------------------------------------------------------------------
.../exec/ops/AbstractOperatorExecContext.java | 95 ---------
.../drill/exec/ops/AccountingDataTunnel.java | 3 +-
.../drill/exec/ops/BaseFragmentContext.java | 90 +++++++++
.../drill/exec/ops/BaseOperatorContext.java | 196 +++++++++++++++++++
.../apache/drill/exec/ops/FragmentContext.java | 81 +++-----
.../exec/ops/FragmentContextInterface.java | 149 ++++++++++++++
.../drill/exec/ops/FragmentExecContext.java | 131 -------------
.../apache/drill/exec/ops/OperExecContext.java | 90 ---------
.../drill/exec/ops/OperExecContextImpl.java | 146 --------------
.../apache/drill/exec/ops/OperatorContext.java | 97 ++++++++-
.../drill/exec/ops/OperatorContextImpl.java | 76 ++-----
.../drill/exec/ops/OperatorExecContext.java | 46 -----
.../drill/exec/ops/OperatorStatReceiver.java | 4 +
.../apache/drill/exec/ops/OperatorStats.java | 2 +
.../drill/exec/physical/impl/ScanBatch.java | 5 +-
.../physical/impl/xsort/SingleBatchSorter.java | 4 +-
.../impl/xsort/SingleBatchSorterTemplate.java | 6 +-
.../impl/xsort/managed/BaseSortWrapper.java | 9 +-
.../impl/xsort/managed/BaseWrapper.java | 8 +-
.../impl/xsort/managed/BufferedBatches.java | 6 +-
.../impl/xsort/managed/ExternalSortBatch.java | 10 +-
.../impl/xsort/managed/MSortTemplate.java | 8 +-
.../physical/impl/xsort/managed/MSorter.java | 4 +-
.../impl/xsort/managed/MergeSortWrapper.java | 15 +-
.../managed/PriorityQueueCopierWrapper.java | 10 +-
.../physical/impl/xsort/managed/SortImpl.java | 10 +-
.../impl/xsort/managed/SortMetrics.java | 1 +
.../impl/xsort/managed/SorterWrapper.java | 10 +-
.../impl/xsort/managed/SpilledRuns.java | 8 +-
.../exec/store/dfs/DrillFSDataInputStream.java | 11 +-
.../drill/exec/store/dfs/DrillFileSystem.java | 6 +-
.../impl/xsort/managed/SortTestUtilities.java | 8 +-
.../impl/xsort/managed/TestSortImpl.java | 8 +-
.../physical/impl/xsort/managed/TestSorter.java | 6 +-
.../physical/unit/BasicPhysicalOpUnitTest.java | 3 +
.../physical/unit/MiniPlanUnitTestBase.java | 36 ++--
.../physical/unit/PhysicalOpUnitTestBase.java | 14 +-
.../org/apache/drill/test/OperatorFixture.java | 124 +++++++-----
38 files changed, 768 insertions(+), 768 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
deleted file mode 100644
index ebef55c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Implementation of {@link OperatorExecContext} that provides services
- * needed by most run-time operators. Excludes services that need the
- * entire Drillbit. Allows easy testing of operator code that uses this
- * interface.
- */
-
-public class AbstractOperatorExecContext implements OperatorExecContext {
-
- protected final BufferAllocator allocator;
- protected final ExecutionControls executionControls;
- protected final PhysicalOperator popConfig;
- protected final BufferManager manager;
- protected OperatorStatReceiver statsWriter;
-
- public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
- ExecutionControls executionControls,
- OperatorStatReceiver stats) {
- this.allocator = allocator;
- this.popConfig = popConfig;
- this.manager = new BufferManagerImpl(allocator);
- statsWriter = stats;
-
- this.executionControls = executionControls;
- }
-
- @Override
- public DrillBuf replace(DrillBuf old, int newSize) {
- return manager.replace(old, newSize);
- }
-
- @Override
- public DrillBuf getManagedBuffer() {
- return manager.getManagedBuffer();
- }
-
- @Override
- public DrillBuf getManagedBuffer(int size) {
- return manager.getManagedBuffer(size);
- }
-
- @Override
- public ExecutionControls getExecutionControls() {
- return executionControls;
- }
-
- @Override
- public BufferAllocator getAllocator() {
- if (allocator == null) {
- throw new UnsupportedOperationException("Operator context does not have an allocator");
- }
- return allocator;
- }
-
- @Override
- public void close() {
- try {
- manager.close();
- } finally {
- if (allocator != null) {
- allocator.close();
- }
- }
- }
-
- @Override
- public OperatorStatReceiver getStatsWriter() {
- return statsWriter;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 22923bb..44aa280 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,7 +23,6 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.slf4j.Logger;
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
new file mode 100644
index 0000000..a39213c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Common implementation for both the test and production versions
+ * of the fragment context.
+ */
+
+public abstract class BaseFragmentContext implements FragmentContextInterface {
+
+ private final FunctionImplementationRegistry funcRegistry;
+
+ public BaseFragmentContext(final FunctionImplementationRegistry funcRegistry) {
+ this.funcRegistry = funcRegistry;
+ }
+
+ @Override
+ public FunctionImplementationRegistry getFunctionRegistry() {
+ return funcRegistry;
+ }
+
+ protected abstract CodeCompiler getCompiler();
+
+ @Override
+ public <T> T getImplementationClass(final ClassGenerator<T> cg)
+ throws ClassTransformationException, IOException {
+ return getImplementationClass(cg.getCodeGenerator());
+ }
+
+ @Override
+ public <T> T getImplementationClass(final CodeGenerator<T> cg)
+ throws ClassTransformationException, IOException {
+ return getCompiler().createInstance(cg);
+ }
+
+ @Override
+ public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+ return getImplementationClass(cg.getCodeGenerator(), instanceCount);
+ }
+
+ @Override
+ public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+ return getCompiler().createInstances(cg, instanceCount);
+ }
+
+ protected abstract BufferManager getBufferManager();
+
+ @Override
+ public DrillBuf replace(final DrillBuf old, final int newSize) {
+ return getBufferManager().replace(old, newSize);
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer() {
+ return getBufferManager().getManagedBuffer();
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer(final int size) {
+ return getBufferManager().getManagedBuffer(size);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
new file mode 100644
index 0000000..123f8fa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. This class provides services common to the test-time
+ * version of the operator context and the full production-time context
+ * that includes network services.
+ */
+
+public abstract class BaseOperatorContext implements OperatorContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
+
+ protected final FragmentContextInterface context;
+ protected final BufferAllocator allocator;
+ protected final PhysicalOperator popConfig;
+ protected final BufferManager manager;
+ private DrillFileSystem fs;
+ private ControlsInjector injector;
+
+ public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator,
+ PhysicalOperator popConfig) {
+ this.context = context;
+ this.allocator = allocator;
+ this.popConfig = popConfig;
+ this.manager = new BufferManagerImpl(allocator);
+ }
+
+ @Override
+ public FragmentContextInterface getFragmentContext() {
+ return context;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends PhysicalOperator> T getOperatorDefn() {
+ return (T) popConfig;
+ }
+
+ public String getName() {
+ return popConfig.getClass().getName();
+ }
+
+ @Override
+ public DrillBuf replace(DrillBuf old, int newSize) {
+ return manager.replace(old, newSize);
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer() {
+ return manager.getManagedBuffer();
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer(int size) {
+ return manager.getManagedBuffer(size);
+ }
+
+ @Override
+ public ExecutionControls getExecutionControls() {
+ return context.getExecutionControls();
+ }
+
+ @Override
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException("Operator context does not have an allocator");
+ }
+ return allocator;
+ }
+
+ // Allow an operator to use the thread pool
+ @Override
+ public ExecutorService getExecutor() {
+ return context.getDrillbitContext().getExecutor();
+ }
+
+ @Override
+ public ExecutorService getScanExecutor() {
+ return context.getDrillbitContext().getScanExecutor();
+ }
+
+ @Override
+ public ExecutorService getScanDecodeExecutor() {
+ return context.getDrillbitContext().getScanDecodeExecutor();
+ }
+
+ @Override
+ public void setInjector(ControlsInjector injector) {
+ this.injector = injector;
+ }
+
+ @Override
+ public ControlsInjector getInjector() {
+ return injector;
+ }
+
+ @Override
+ public void injectUnchecked(String desc) {
+ ExecutionControls executionControls = context.getExecutionControls();
+ if (injector != null && executionControls != null) {
+ injector.injectUnchecked(executionControls, desc);
+ }
+ }
+
+ @Override
+ public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
+ throws T {
+ ExecutionControls executionControls = context.getExecutionControls();
+ if (injector != null && executionControls != null) {
+ injector.injectChecked(executionControls, desc, exceptionClass);
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ manager.close();
+ } catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ if (allocator != null) {
+ allocator.close();
+ }
+ } catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ try {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ } catch (IOException e) {
+ if (ex == null) {
+ ex = UserException
+ .resourceError(e)
+ .addContext("Failed to close the Drill file system for " + getName())
+ .build(logger);
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+ Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+ fs = new DrillFileSystem(conf, getStatsWriter());
+ return fs;
+ }
+
+ /**
+ * Creates a DrillFileSystem that does not automatically track operator stats.
+ */
+ @Override
+ public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
+ Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+ fs = new DrillFileSystem(conf, null);
+ return fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 19ffca2..1cde97a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,22 +17,18 @@
*/
package org.apache.drill.exec.ops;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.buffer.DrillBuf;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -59,15 +55,21 @@ import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.batch.IncomingBuffers;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.DrillBuf;
/**
* Contextual objects required for execution of a particular fragment.
+ * This is the implementation; use <tt>FragmentContextInterface</tt>
+ * in code to allow tests to use test-time implementations.
*/
-public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExecContext {
+
+public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
@@ -77,7 +79,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
private final UserClientConnection connection; // is null if this context is for non-root fragment
private final QueryContext queryContext; // is null if this context is for non-root fragment
private final FragmentStats stats;
- private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
private final PlanFragment fragment;
private final ContextInformation contextInformation;
@@ -87,7 +88,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
private ExecutorState executorState;
private final ExecutionControls executionControls;
-
private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@Override
@@ -135,12 +135,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
+ super(funcRegistry);
this.context = dbContext;
this.queryContext = queryContext;
this.connection = connection;
this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
this.fragment = fragment;
- this.funcRegistry = funcRegistry;
contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
@@ -225,6 +225,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
return executorState.shouldContinue();
}
+ @Override
public DrillbitContext getDrillbitContext() {
return context;
}
@@ -313,25 +314,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
}
@Override
- public <T> T getImplementationClass(final ClassGenerator<T> cg)
- throws ClassTransformationException, IOException {
- return getImplementationClass(cg.getCodeGenerator());
- }
-
- @Override
- public <T> T getImplementationClass(final CodeGenerator<T> cg)
- throws ClassTransformationException, IOException {
- return context.getCompiler().createInstance(cg);
- }
-
- @Override
- public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
- return getImplementationClass(cg.getCodeGenerator(), instanceCount);
- }
-
- @Override
- public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
- return context.getCompiler().createInstances(cg, instanceCount);
+ protected CodeCompiler getCompiler() {
+ return context.getCompiler();
}
public AccountingUserConnection getUserDataTunnel() {
@@ -383,11 +367,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
}
@Override
- public FunctionImplementationRegistry getFunctionRegistry() {
- return funcRegistry;
- }
-
- @Override
public DrillConfig getConfig() {
return context.getConfig();
}
@@ -439,19 +418,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
}
}
- public DrillBuf replace(final DrillBuf old, final int newSize) {
- return bufferManager.replace(old, newSize);
- }
-
- @Override
- public DrillBuf getManagedBuffer() {
- return bufferManager.getManagedBuffer();
- }
-
- public DrillBuf getManagedBuffer(final int size) {
- return bufferManager.getManagedBuffer(size);
- }
-
@Override
public PartitionExplorer getPartitionExplorer() {
throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " +
@@ -494,6 +460,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
return buffers.isDone();
}
+ @Override
+ protected BufferManager getBufferManager() {
+ return bufferManager;
+ }
+
public interface ExecutorState {
/**
* Whether execution should continue.
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
new file mode 100644
index 0000000..7d4ba18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Fragment context interface: separates implementation from definition.
+ * Allows unit testing by mocking or reimplementing services with
+ * test-time versions. The name is awkward, chosen to avoid renaming
+ * the implementation class which is used in many places in legacy code.
+ * New code should use this interface, and the names should eventually
+ * be swapped with <tt>FragmentContext</tt> becoming
+ * <tt>FragmentContextImpl</tt> and this interface becoming
+ * <tt>FragmentContext</tt>.
+ */
+
+public interface FragmentContextInterface {
+
+ /**
+ * Drillbit context. Valid only in production; returns null in
+ * operator test environments.
+ */
+
+ DrillbitContext getDrillbitContext();
+
+ /**
+ * Returns the UDF registry.
+ * @return the UDF registry
+ */
+ FunctionImplementationRegistry getFunctionRegistry();
+ /**
+ * Returns a read-only version of the session options.
+ * @return the session options
+ */
+ OptionSet getOptionSet();
+
+ /**
+ * Generates code for a class given a {@link ClassGenerator},
+ * and returns a single instance of the generated class. (Note
+ * that the name is a misnomer, it would be better called
+ * <tt>getImplementationInstance</tt>.)
+ *
+ * @param cg the class generator
+ * @return an instance of the generated class
+ */
+
+ <T> T getImplementationClass(final ClassGenerator<T> cg)
+ throws ClassTransformationException, IOException;
+
+ /**
+ * Generates code for a class given a {@link CodeGenerator},
+ * and returns a single instance of the generated class. (Note
+ * that the name is a misnomer, it would be better called
+ * <tt>getImplementationInstance</tt>.)
+ *
+ * @param cg the code generator
+ * @return an instance of the generated class
+ */
+
+ <T> T getImplementationClass(final CodeGenerator<T> cg)
+ throws ClassTransformationException, IOException;
+
+ /**
+ * Generates code for a class given a {@link ClassGenerator}, and returns the
+ * specified number of instances of the generated class. (Note that the name
+ * is a misnomer, it would be better called
+ * <tt>getImplementationInstances</tt>.)
+ *
+ * @param cg
+ * the class generator
+ * @return list of instances of the generated class
+ */
+
+ <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
+ throws ClassTransformationException, IOException;
+
+ /**
+ * Generates code for a class given a {@link CodeGenerator}, and returns the
+ * specified number of instances of the generated class. (Note that the name
+ * is a misnomer, it would be better called
+ * <tt>getImplementationInstances</tt>.)
+ *
+ * @param cg
+ * the code generator
+ * @return list of instances of the generated class
+ */
+
+ <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
+ throws ClassTransformationException, IOException;
+
+ /**
+ * Determine if fragment execution has been interrupted.
+ * @return true if execution should continue, false if an interruption has
+ * occurred and fragment execution should halt
+ */
+
+ boolean shouldContinue();
+
+ /**
+ * Return the set of execution controls used to inject faults into running
+ * code for testing.
+ *
+ * @return the execution controls
+ */
+ ExecutionControls getExecutionControls();
+
+ /**
+ * Returns the Drill configuration for this run. Note that the config is
+ * global and immutable.
+ *
+ * @return the Drill configuration
+ */
+
+ DrillConfig getConfig();
+
+ DrillBuf replace(DrillBuf old, int newSize);
+
+ DrillBuf getManagedBuffer();
+
+ DrillBuf getManagedBuffer(int size);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
deleted file mode 100644
index 526c030..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * Services passed to fragments that deal only with execution details
- * such as the function registry, options, code generation and the like.
- * Does not include top-level services such as network endpoints. Code
- * written to use this interface can be unit tested quite easily using
- * the {@link OperatorContext} class. Code that uses the wider,
- * more global {@link FragmentContext} must be tested in the context
- * of the entire Drill server, or using mocks for the global services.
- */
-
-public interface FragmentExecContext {
- /**
- * Returns the UDF registry.
- * @return the UDF registry
- */
- FunctionImplementationRegistry getFunctionRegistry();
- /**
- * Returns a read-only version of the session options.
- * @return the session options
- */
- OptionSet getOptionSet();
-
- /**
- * Generates code for a class given a {@link ClassGenerator},
- * and returns a single instance of the generated class. (Note
- * that the name is a misnomer, it would be better called
- * <tt>getImplementationInstance</tt>.)
- *
- * @param cg the class generator
- * @return an instance of the generated class
- */
-
- <T> T getImplementationClass(final ClassGenerator<T> cg)
- throws ClassTransformationException, IOException;
-
- /**
- * Generates code for a class given a {@link CodeGenerator},
- * and returns a single instance of the generated class. (Note
- * that the name is a misnomer, it would be better called
- * <tt>getImplementationInstance</tt>.)
- *
- * @param cg the code generator
- * @return an instance of the generated class
- */
-
- <T> T getImplementationClass(final CodeGenerator<T> cg)
- throws ClassTransformationException, IOException;
-
- /**
- * Generates code for a class given a {@link ClassGenerator}, and returns the
- * specified number of instances of the generated class. (Note that the name
- * is a misnomer, it would be better called
- * <tt>getImplementationInstances</tt>.)
- *
- * @param cg
- * the class generator
- * @return list of instances of the generated class
- */
-
- <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
- throws ClassTransformationException, IOException;
-
- /**
- * Generates code for a class given a {@link CodeGenerator}, and returns the
- * specified number of instances of the generated class. (Note that the name
- * is a misnomer, it would be better called
- * <tt>getImplementationInstances</tt>.)
- *
- * @param cg
- * the code generator
- * @return list of instances of the generated class
- */
-
- <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
- throws ClassTransformationException, IOException;
-
- /**
- * Determine if fragment execution has been interrupted.
- * @return true if execution should continue, false if an interruption has
- * occurred and fragment execution should halt
- */
-
- boolean shouldContinue();
-
- /**
- * Return the set of execution controls used to inject faults into running
- * code for testing.
- *
- * @return the execution controls
- */
- ExecutionControls getExecutionControls();
-
- /**
- * Returns the Drill configuration for this run. Note that the config is
- * global and immutable.
- *
- * @return the Drill configuration
- */
-
- DrillConfig getConfig();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
deleted file mode 100644
index 89f3b63..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.testing.ControlsInjector;
-
-/**
- * Defines the set of services used by operator implementations. This
- * is a subset of the full {@link OperatorContext} which removes global
- * services such as network endpoints. Code written to this interface
- * can be easily unit tested using the {@link OperatorFixture} class.
- * Code that needs global services must be tested in the Drill server
- * as a whole, or using mocks for global services.
- */
-
-public interface OperExecContext extends FragmentExecContext {
-
- /**
- * Return the physical operator definition created by the planner and passed
- * into the Drillbit executing the query.
- * @return the physical operator definition
- */
-
- <T extends PhysicalOperator> T getOperatorDefn();
-
- /**
- * Return the memory allocator for this operator.
- *
- * @return the per-operator memory allocator
- */
-
- BufferAllocator getAllocator();
-
- /**
- * A write-only interface to the Drill statistics mechanism. Allows
- * operators to update statistics.
- * @return operator statistics
- */
-
- OperatorStatReceiver getStats();
-
- /**
- * Returns the fault injection mechanism used to introduce faults at runtime
- * for testing.
- * @return the fault injector
- */
-
- ControlsInjector getInjector();
-
- /**
- * Insert an unchecked fault (exception). Handles the details of checking if
- * fault injection is enabled and this particular fault is selected.
- * @param desc the description of the fault used to match a fault
- * injection parameter to determine if the fault should be injected
- * @throws RuntimeException an unchecked exception if the fault is enabled
- */
-
- void injectUnchecked(String desc);
-
- /**
- * Insert a checked fault (exception) of the given class. Handles the details
- * of checking if fault injection is enabled and this particular fault is
- * selected.
- *
- * @param desc the description of the fault used to match a fault
- * injection parameter to determine if the fault should be injected
- * @param exceptionClass the class of exeception to be thrown
- * @throws T if the fault is enabled
- */
-
- <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
- throws T;
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
deleted file mode 100644
index b625e76..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.testing.ControlsInjector;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * Implementation of the context used by low-level operator
- * tasks.
- */
-
-public class OperExecContextImpl implements OperExecContext {
-
- private FragmentExecContext fragmentContext;
- private PhysicalOperator operDefn;
- private ControlsInjector injector;
- private BufferAllocator allocator;
- private OperatorStatReceiver stats;
-
- public OperExecContextImpl(FragmentExecContext fragContext, OperatorContext opContext, PhysicalOperator opDefn, ControlsInjector injector) {
- this(fragContext, opContext.getAllocator(), opContext.getStats(), opDefn, injector);
- }
-
- public OperExecContextImpl(FragmentExecContext fragContext, BufferAllocator allocator, OperatorStatReceiver stats, PhysicalOperator opDefn, ControlsInjector injector) {
- this.fragmentContext = fragContext;
- this.operDefn = opDefn;
- this.injector = injector;
- this.allocator = allocator;
- this.stats = stats;
- }
-
- @Override
- public FunctionImplementationRegistry getFunctionRegistry() {
- return fragmentContext.getFunctionRegistry();
- }
-
- @Override
- public OptionSet getOptionSet() {
- return fragmentContext.getOptionSet();
- }
-
- @Override
- public <T> T getImplementationClass(ClassGenerator<T> cg)
- throws ClassTransformationException, IOException {
- return fragmentContext.getImplementationClass(cg);
- }
-
- @Override
- public <T> T getImplementationClass(CodeGenerator<T> cg)
- throws ClassTransformationException, IOException {
- return fragmentContext.getImplementationClass(cg);
- }
-
- @Override
- public <T> List<T> getImplementationClass(ClassGenerator<T> cg,
- int instanceCount) throws ClassTransformationException, IOException {
- return fragmentContext.getImplementationClass(cg, instanceCount);
- }
-
- @Override
- public <T> List<T> getImplementationClass(CodeGenerator<T> cg,
- int instanceCount) throws ClassTransformationException, IOException {
- return fragmentContext.getImplementationClass(cg, instanceCount);
- }
-
- @Override
- public boolean shouldContinue() {
- return fragmentContext.shouldContinue();
- }
-
- @Override
- public ExecutionControls getExecutionControls() {
- return fragmentContext.getExecutionControls();
- }
-
- @Override
- public BufferAllocator getAllocator() {
- return allocator;
- }
-
- @Override
- public OperatorStatReceiver getStats() {
- return stats;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends PhysicalOperator> T getOperatorDefn() {
- return (T) operDefn;
- }
-
- @Override
- public DrillConfig getConfig() {
- return fragmentContext.getConfig();
- }
-
- @Override
- public ControlsInjector getInjector() {
- return injector;
- }
-
- @Override
- public void injectUnchecked(String desc) {
- ExecutionControls executionControls = fragmentContext.getExecutionControls();
- if (injector != null && executionControls != null) {
- injector.injectUnchecked(executionControls, desc);
- }
- }
-
- @Override
- public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
- throws T {
- ExecutionControls executionControls = fragmentContext.getExecutionControls();
- if (injector != null && executionControls != null) {
- injector.injectChecked(executionControls, desc, exceptionClass);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index b248d5f..37653e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -21,13 +21,70 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.util.concurrent.ListenableFuture;
-public interface OperatorContext extends OperatorExecContext {
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Per-operator services available for operator implementations.
+ * The services allow access to the operator definition, to the
+ * fragment context, and to per-operator services.
+ * <p>
+ * Use this interface in code to allow unit tests to provide
+ * test-time implementations of this context.
+ */
+
+public interface OperatorContext {
+
+ /**
+ * Return the physical operator definition created by the planner and passed
+ * into the Drillbit executing the query.
+ * @return the physical operator definition
+ */
+
+ <T extends PhysicalOperator> T getOperatorDefn();
+
+ /**
+ * Return the memory allocator for this operator.
+ *
+ * @return the per-operator memory allocator
+ */
+
+ BufferAllocator getAllocator();
+
+ FragmentContextInterface getFragmentContext();
+
+ DrillBuf replace(DrillBuf old, int newSize);
+
+ DrillBuf getManagedBuffer();
+
+ DrillBuf getManagedBuffer(int size);
+
+ ExecutionControls getExecutionControls();
+
+ /**
+ * A write-only interface to the Drill statistics mechanism. Allows
+ * operators to update statistics.
+ * @return operator statistics
+ */
+
+ OperatorStatReceiver getStatsWriter();
+
+ /**
+ * Full operator stats (for legacy code). Prefer
+ * <tt>getStatsWriter()</tt> to allow code to easily run in a
+ * test environment.
+ *
+ * @return operator statistics
+ */
OperatorStats getStats();
@@ -51,4 +108,40 @@ public interface OperatorContext extends OperatorExecContext {
*/
<RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
Callable<RESULT> callable);
- }
+
+ void setInjector(ControlsInjector injector);
+
+ /**
+ * Returns the fault injection mechanism used to introduce faults at runtime
+ * for testing.
+ * @return the fault injector
+ */
+
+ ControlsInjector getInjector();
+
+ /**
+ * Insert an unchecked fault (exception). Handles the details of checking if
+ * fault injection is enabled and this particular fault is selected.
+ * @param desc the description of the fault used to match a fault
+ * injection parameter to determine if the fault should be injected
+ * @throws RuntimeException an unchecked exception if the fault is enabled
+ */
+
+ void injectUnchecked(String desc);
+
+ /**
+ * Insert a checked fault (exception) of the given class. Handles the details
+ * of checking if fault injection is enabled and this particular fault is
+ * selected.
+ *
+ * @param desc the description of the fault used to match a fault
+ * injection parameter to determine if the fault should be injected
+ * @param exceptionClass the class of exeception to be thrown
+ * @throws T if the fault is enabled
+ */
+
+ <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
+ throws T;
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 37c609e..bc85c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,33 +17,23 @@
*/
package org.apache.drill.exec.ops;
-import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.work.WorkManager;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable {
+class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
private boolean closed = false;
private final OperatorStats stats;
- private DrillFileSystem fs;
- private final ExecutorService executor;
- private final ExecutorService scanExecutor;
- private final ExecutorService scanDecodeExecutor;
/**
* This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There
@@ -59,9 +49,10 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
throws OutOfMemoryException {
- super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
- popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
- popConfig, context.getExecutionControls(), stats);
+ super(context,
+ context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+ popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
+ popConfig);
if (stats != null) {
this.stats = stats;
} else {
@@ -70,25 +61,6 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
OperatorUtilities.getChildCount(popConfig));
this.stats = context.getStats().newOperatorStats(def, allocator);
}
- executor = context.getDrillbitContext().getExecutor();
- scanExecutor = context.getDrillbitContext().getScanExecutor();
- scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
- }
-
- // Allow an operator to use the thread pool
- @Override
- public ExecutorService getExecutor() {
- return executor;
- }
-
- @Override
- public ExecutorService getScanExecutor() {
- return scanExecutor;
- }
-
- @Override
- public ExecutorService getScanDecodeExecutor() {
- return scanDecodeExecutor;
}
public boolean isClosed() {
@@ -98,23 +70,15 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
@Override
public void close() {
if (closed) {
- logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+ logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? getName() : null);
return;
}
- logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+ logger.debug("Closing context for {}", popConfig != null ? getName() : null);
- closed = true;
try {
super.close();
} finally {
- if (fs != null) {
- try {
- fs.close();
- fs = null;
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
- }
- }
+ closed = true;
}
}
@@ -124,11 +88,16 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
}
@Override
+ public OperatorStatReceiver getStatsWriter() {
+ return stats;
+ }
+
+ @Override
public <RESULT> ListenableFuture<RESULT> runCallableAs(final UserGroupInformation proxyUgi,
final Callable<RESULT> callable) {
synchronized (this) {
if (delegatePool == null) {
- delegatePool = MoreExecutors.listeningDecorator(executor);
+ delegatePool = MoreExecutors.listeningDecorator(getExecutor());
}
}
return delegatePool.submit(new Callable<RESULT>() {
@@ -152,21 +121,4 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato
}
});
}
-
- @Override
- public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
- Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
- fs = new DrillFileSystem(conf, getStats());
- return fs;
- }
-
- /**
- * Creates a DrillFileSystem that does not automatically track operator stats.
- */
- @Override
- public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
- Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
- fs = new DrillFileSystem(conf, null);
- return fs;
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
deleted file mode 100644
index 4d64aba..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Narrowed version of the {@link OperatorContext} used to create an
- * easy-to-test version of the operator context that excludes services
- * that require a full Drillbit server.
- */
-
-public interface OperatorExecContext {
-
- DrillBuf replace(DrillBuf old, int newSize);
-
- DrillBuf getManagedBuffer();
-
- DrillBuf getManagedBuffer(int size);
-
- BufferAllocator getAllocator();
-
- ExecutionControls getExecutionControls();
-
- OperatorStatReceiver getStatsWriter();
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
index 6aa8d76..4dba2c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java
@@ -67,4 +67,8 @@ public interface OperatorStatReceiver {
*/
void setDoubleStat(MetricDef metric, double value);
+
+ void startWait();
+
+ void stopWait();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index b3c9ff9..1b96f28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -169,6 +169,7 @@ public class OperatorStats implements OperatorStatReceiver {
inProcessing = false;
}
+ @Override
public synchronized void startWait() {
assert !inWait : assertionError("starting waiting");
stopProcessing();
@@ -176,6 +177,7 @@ public class OperatorStats implements OperatorStatReceiver {
waitMark = System.nanoTime();
}
+ @Override
public synchronized void stopWait() {
assert inWait : assertionError("stopping waiting");
startProcessing();
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6c2c171..77e9ea4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorExecContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -319,9 +318,9 @@ public class ScanBatch implements CloseableRecordBatch {
private final VectorContainer container;
- private final OperatorExecContext oContext;
+ private final OperatorContext oContext;
- public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) {
+ public Mutator(OperatorContext oContext, BufferAllocator allocator, VectorContainer container) {
this.oContext = oContext;
this.allocator = allocator;
this.container = container;
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
index ccaca98..733ea5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
public interface SingleBatchSorter {
- public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
+ public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
public void sort(SelectionVector2 vector2) throws SchemaChangeException;
public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION =
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 672dd2b..0f4680d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
private SelectionVector2 vector2;
@Override
- public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
+ public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
Preconditions.checkNotNull(vector2);
this.vector2 = vector2;
try {
@@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
}
}
- public abstract void doSetup(@Named("context") FragmentExecContext context,
+ public abstract void doSetup(@Named("context") FragmentContextInterface context,
@Named("incoming") VectorAccessible incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
index 1f381b9..338462e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.record.VectorAccessible;
@@ -45,7 +45,7 @@ public abstract class BaseSortWrapper extends BaseWrapper {
protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- public BaseSortWrapper(OperExecContext opContext) {
+ public BaseSortWrapper(OperatorContext opContext) {
super(opContext);
}
@@ -56,7 +56,8 @@ public abstract class BaseSortWrapper extends BaseWrapper {
for (Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,
+ context.getFragmentContext().getFunctionRegistry());
if (collector.hasErrors()) {
throw UserException.unsupportedError()
.message("Failure while materializing expression. " + collector.toErrorString())
@@ -71,7 +72,7 @@ public abstract class BaseSortWrapper extends BaseWrapper {
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
- context.getFunctionRegistry());
+ context.getFragmentContext().getFunctionRegistry());
HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
index e607f40..0287059 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
/**
* Base class for code-generation-based tasks.
@@ -30,15 +30,15 @@ import org.apache.drill.exec.ops.OperExecContext;
public abstract class BaseWrapper {
- protected OperExecContext context;
+ protected OperatorContext context;
- public BaseWrapper(OperExecContext context) {
+ public BaseWrapper(OperatorContext context) {
this.context = context;
}
protected <T> T getInstance(CodeGenerator<T> cg, org.slf4j.Logger logger) {
try {
- return context.getImplementationClass(cg);
+ return context.getFragmentContext().getImplementationClass(cg);
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely code error.")
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
index c930877..f26f6b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -55,9 +55,9 @@ public class BufferedBatches {
private BatchSchema schema;
- private final OperExecContext context;
+ private final OperatorContext context;
- public BufferedBatches(OperExecContext opContext) {
+ public BufferedBatches(OperatorContext opContext) {
context = opContext;
sorterWrapper = new SorterWrapper(opContext);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 6a97c29..2054c9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperExecContext;
-import org.apache.drill.exec.ops.OperExecContextImpl;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
@@ -218,10 +216,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SortConfig sortConfig = new SortConfig(context.getConfig());
SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
- OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
- PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
- SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
- sortImpl = new SortImpl(opContext, sortConfig, spilledRuns, container);
+ oContext.setInjector(injector);
+ PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
+ SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
+ sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
// The upstream operator checks on record count before we have
// results. Create an empty result set temporarily to handle
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 5b07c4a..698e32f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
*/
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
- private FragmentExecContext context;
+ private FragmentContextInterface context;
/**
* Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
private int desiredRecordBatchCount;
@Override
- public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
+ public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4,
final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
@@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
}
}
- public abstract void doSetup(@Named("context") FragmentExecContext context,
+ public abstract void doSetup(@Named("context") FragmentContextInterface context,
@Named("incoming") VectorContainer incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 71ae29e..428f6f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.FragmentContextInterface;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
*/
public interface MSorter {
- public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4,
+ public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4,
VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
public void sort();
public SelectionVector4 getSV4();
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 01135f0..f592e44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
@@ -87,7 +87,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
private State state = State.FIRST;
private final VectorContainer destContainer;
- public MergeSortWrapper(OperExecContext opContext, VectorContainer destContainer) {
+ public MergeSortWrapper(OperatorContext opContext, VectorContainer destContainer) {
super(opContext);
this.destContainer = destContainer;
}
@@ -123,7 +123,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
sv4 = builder.getSv4();
Sort popConfig = context.getOperatorDefn();
mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
- mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize);
+ mSorter.setup(context.getFragmentContext(), context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize);
} catch (SchemaChangeException e) {
throw UserException.unsupportedError(e)
.message("Unexpected schema change - likely code error.")
@@ -142,7 +142,9 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
}
private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
- CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+ CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION,
+ context.getFragmentContext().getFunctionRegistry(),
+ context.getFragmentContext().getOptionSet());
cg.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
@@ -153,7 +155,8 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector,
+ context.getFragmentContext().getFunctionRegistry());
if (collector.hasErrors()) {
throw UserException.unsupportedError()
.message("Failure while materializing expression. " + collector.toErrorString())
@@ -168,7 +171,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
- context.getFunctionRegistry());
+ context.getFragmentContext().getFunctionRegistry());
HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 88686e5..ab8cc9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -30,14 +30,14 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -66,7 +66,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
private PriorityQueueCopier copier;
- public PriorityQueueCopierWrapper(OperExecContext opContext) {
+ public PriorityQueueCopierWrapper(OperatorContext opContext) {
super(opContext);
}
@@ -80,7 +80,9 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
private PriorityQueueCopier newCopier(VectorAccessible batch) {
// Generate the copier code and obtain the resulting class
- CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+ CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION,
+ context.getFragmentContext().getFunctionRegistry(),
+ context.getFragmentContext().getOptionSet());
ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
cg.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index d2b589c..2d53c3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -21,17 +21,17 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -174,7 +174,7 @@ public class SortImpl {
private final SortMetrics metrics;
private final SortMemoryManager memManager;
private VectorContainer outputBatch;
- private OperExecContext context;
+ private OperatorContext context;
/**
* Memory allocator for this operator itself. Incoming batches are
@@ -192,7 +192,7 @@ public class SortImpl {
private VectorInitializer allocHelper;
- public SortImpl(OperExecContext opContext, SortConfig sortConfig,
+ public SortImpl(OperatorContext opContext, SortConfig sortConfig,
SpilledRuns spilledRuns, VectorContainer batch) {
this.context = opContext;
outputBatch = batch;
@@ -200,7 +200,7 @@ public class SortImpl {
allocator = opContext.getAllocator();
config = sortConfig;
memManager = new SortMemoryManager(config, allocator.getLimit());
- metrics = new SortMetrics(opContext.getStats());
+ metrics = new SortMetrics(opContext.getStatsWriter());
bufferedBatches = new BufferedBatches(opContext);
// Request leniency from the allocator. Leniency
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
index 1233de8..8d20cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
@@ -44,6 +44,7 @@ public class SortMetrics {
private long writeBytes;
public SortMetrics(OperatorStatReceiver stats) {
+ assert stats != null;
this.stats = stats;
}