You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/26 12:44:32 UTC
[08/11] drill git commit: DRILL-5730: Mock testing improvements and
interface improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
new file mode 100644
index 0000000..b3c84bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
+import org.apache.drill.exec.server.options.FragmentOptionManager;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Contextual objects required for execution of a particular fragment.
+ * This is the implementation; use <tt>FragmentContext</tt>
+ * in code to allow tests to use test-time implementations.
+ */
+public class FragmentContextImpl extends BaseFragmentContext implements ExecutorFragmentContext {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContextImpl.class);
+
+ private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
+ private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
+
+ private final DrillbitContext context;
+ private final UserClientConnection connection; // is null if this context is for non-root fragment
+ private final QueryContext queryContext; // is null if this context is for non-root fragment
+ private final FragmentStats stats;
+ private final BufferAllocator allocator;
+ private final PlanFragment fragment;
+ private final ContextInformation contextInformation;
+ private IncomingBuffers buffers;
+ private final OptionManager fragmentOptions;
+ private final BufferManager bufferManager;
+ private ExecutorState executorState;
+ private final ExecutionControls executionControls;
+
+ private final SendingAccountor sendingAccountor = new SendingAccountor();
+ private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
+ @Override
+ public void accept(final RpcException e) {
+ fail(e);
+ }
+
+ @Override
+ public void interrupt(final InterruptedException e) {
+ if (executorState.shouldContinue()) {
+ logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
+ fail(e);
+ }
+ }
+ };
+
+ private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+ private final AccountingUserConnection accountingUserConnection;
+ /** Stores constants and their holders by type */
+ private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
+
+ /**
+ * Create a FragmentContext instance for non-root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
+ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment,
+ final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, null, funcRegistry);
+ }
+
+ /**
+ * Create a FragmentContext instance for root fragment.
+ *
+ * @param dbContext DrillbitContext.
+ * @param fragment Fragment implementation.
+ * @param queryContext QueryContext.
+ * @param connection UserClientConnection.
+ * @param funcRegistry FunctionImplementationRegistry.
+ * @throws ExecutionSetupException
+ */
+ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
+ final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
+ throws ExecutionSetupException {
+ super(funcRegistry);
+ this.context = dbContext;
+ this.queryContext = queryContext;
+ this.connection = connection;
+ this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
+ this.fragment = fragment;
+ contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
+
+ logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
+ logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+
+ final OptionList list;
+ if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
+ list = new OptionList();
+ } else {
+ try {
+ list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+ } catch (final Exception e) {
+ throw new ExecutionSetupException("Failure while reading plan options.", e);
+ }
+ }
+ fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+
+ executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
+
+ // Add the fragment context to the root allocator.
+ // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
+ try {
+ allocator = context.getAllocator().newChildAllocator(
+ "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
+ fragment.getMemInitial(),
+ fragment.getMemMax());
+ Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+ } catch (final OutOfMemoryException e) {
+ throw UserException.memoryError(e)
+ .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
+ .build(logger);
+ } catch(final Throwable e) {
+ throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
+ }
+
+ stats = new FragmentStats(allocator, fragment.getAssignment());
+ bufferManager = new BufferManagerImpl(this.allocator);
+ constantValueHolderCache = Maps.newHashMap();
+ }
+
+ /**
+ * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+ * the long list of test files.
+ */
+ public FragmentContextImpl(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+ FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+ this(dbContext, fragment, null, connection, funcRegistry);
+ }
+
+ @Override
+ public OptionManager getOptions() {
+ return fragmentOptions;
+ }
+
+ @Override
+ public PhysicalPlanReader getPlanReader() {
+ return context.getPlanReader();
+ }
+
+ @Override
+ public ClusterCoordinator getClusterCoordinator() {
+ return context.getClusterCoordinator();
+ }
+
+ @Override
+ public void setBuffers(final IncomingBuffers buffers) {
+ Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
+ this.buffers = buffers;
+ }
+ public QueryProfileStoreContext getProfileStoreContext() {
+ return context.getProfileStoreContext();
+ }
+
+ @Override
+ public Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections() {
+ return context.getUserConnections();
+ }
+
+ public void setExecutorState(final ExecutorState executorState) {
+ Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+ this.executorState = executorState;
+ }
+
+ public void fail(final Throwable cause) {
+ executorState.fail(cause);
+ }
+
+ public SchemaPlus getFullRootSchema() {
+ if (queryContext == null) {
+ fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
+ "This is a non-root fragment."));
+ return null;
+ }
+
+ final boolean isImpersonationEnabled = isImpersonationEnabled();
+ // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
+ // InfoSchema purpose we want to show tables the user has permissions to list or query. If impersonation is
+ // disabled view the schema as Drillbit process user and throw authorization errors to client.
+ SchemaConfig schemaConfig = SchemaConfig
+ .newBuilder(
+ isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
+ queryContext)
+ .setIgnoreAuthErrors(isImpersonationEnabled)
+ .build();
+
+ return queryContext.getFullRootSchema(schemaConfig);
+ }
+
+ public FragmentStats getStats() {
+ return stats;
+ }
+
+ @Override
+ public Collection<DrillbitEndpoint> getBits() {
+ return context.getBits();
+ }
+
+ @Override
+ public ContextInformation getContextInformation() {
+ return contextInformation;
+ }
+
+ @Override
+ public DrillbitEndpoint getForemanEndpoint() {
+ return fragment.getForeman();
+ }
+
+ @Override
+ public DrillbitEndpoint getEndpoint() {
+ return context.getEndpoint();
+ }
+
+ @Override
+ public Controller getController() {
+ return context.getController();
+ }
+
+ @Override
+ public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+ return context.getOperatorCreatorRegistry();
+ }
+
+ @Override
+ public ExecutorService getScanDecodeExecutor() {
+ return context.getScanDecodeExecutor();
+ }
+
+ @Override
+ public ExecutorService getScanExecutor() {
+ return context.getScanExecutor();
+ }
+
+ /**
+ * The FragmentHandle for this Fragment
+ * @return FragmentHandle
+ */
+ public FragmentHandle getHandle() {
+ return fragment.getHandle();
+ }
+
+ public String getFragIdString() {
+ final FragmentHandle handle = getHandle();
+ final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
+ return frag;
+ }
+
+ @Override
+ public boolean isUserAuthenticationEnabled() {
+ // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+ // no config
+ if (getConfig() == null) {
+ return false;
+ }
+
+ return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
+ }
+
+ /**
+ * Get this fragment's allocator.
+ * @return the allocator
+ */
+ @Deprecated
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
+ }
+ return allocator;
+ }
+
+ @Override
+ public BufferAllocator getNewChildAllocator(final String operatorName,
+ final int operatorId,
+ final long initialReservation,
+ final long maximumReservation) throws OutOfMemoryException {
+ return allocator.newChildAllocator(
+ "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
+ initialReservation,
+ maximumReservation
+ );
+ }
+
+ public boolean isOverMemoryLimit() {
+ return allocator.isOverLimit();
+ }
+
+ @Override
+ public CodeCompiler getCompiler() {
+ return context.getCompiler();
+ }
+
+ @Override
+ public AccountingUserConnection getUserDataTunnel() {
+ Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
+ return accountingUserConnection;
+ }
+
+ @Override
+ public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
+ AccountingDataTunnel tunnel = tunnels.get(endpoint);
+ if (tunnel == null) {
+ tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
+ tunnels.put(endpoint, tunnel);
+ }
+ return tunnel;
+ }
+
+ public IncomingBuffers getBuffers() {
+ return buffers;
+ }
+
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
+ throws OutOfMemoryException {
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
+ contexts.add(context);
+ return context;
+ }
+
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig)
+ throws OutOfMemoryException {
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this);
+ contexts.add(context);
+ return context;
+ }
+
+ @Override
+ public DrillConfig getConfig() {
+ return context.getConfig();
+ }
+
+ @Override
+ public ExecutorState getExecutorState() {
+ return executorState;
+ }
+
+ @Override
+ public ExecutionControls getExecutionControls() {
+ return executionControls;
+ }
+
+ public String getQueryUserName() {
+ return fragment.getCredentials().getUserName();
+ }
+
+ public boolean isImpersonationEnabled() {
+ // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+ // no config
+ if (getConfig() == null) {
+ return false;
+ }
+
+ return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+ }
+
+ @Override
+ public void close() {
+ waitForSendComplete();
+
+ // close operator context
+ for (OperatorContextImpl opContext : contexts) {
+ suppressingClose(opContext);
+ }
+
+ suppressingClose(bufferManager);
+ suppressingClose(buffers);
+ suppressingClose(allocator);
+ }
+
+ private void suppressingClose(final AutoCloseable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final Exception e) {
+ fail(e);
+ }
+ }
+
+ @Override
+ public PartitionExplorer getPartitionExplorer() {
+ throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " +
+ "in functions that can be evaluated at planning time. Make sure that the %s configuration " +
+ "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName()));
+ }
+
+ @Override
+ public ValueHolder getConstantValueHolder(String value, MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) {
+ if (!constantValueHolderCache.containsKey(value)) {
+ constantValueHolderCache.put(value, Maps.<MinorType, ValueHolder>newHashMap());
+ }
+
+ Map<MinorType, ValueHolder> holdersByType = constantValueHolderCache.get(value);
+ ValueHolder valueHolder = holdersByType.get(type);
+ if (valueHolder == null) {
+ valueHolder = holderInitializer.apply(getManagedBuffer());
+ holdersByType.put(type, valueHolder);
+ }
+ return valueHolder;
+ }
+
+ public ExecutorService getExecutor(){
+ return context.getExecutor();
+ }
+
+ @Override
+ public void waitForSendComplete() {
+ sendingAccountor.waitForSendComplete();
+ }
+
+ @Override
+ public WorkEventBus getWorkEventbus() {
+ return context.getWorkBus();
+ }
+
+ public boolean isBuffersDone() {
+ Preconditions.checkState(this.buffers != null, "Incoming Buffers is not set in this fragment context");
+ return buffers.isDone();
+ }
+
+ @Override
+ protected BufferManager getBufferManager() {
+ return bufferManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 3d2fdd8..e6ea5b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -60,7 +60,7 @@ public interface OperatorContext {
BufferAllocator getAllocator();
- FragmentContextInterface getFragmentContext();
+ FragmentContext getFragmentContext();
DrillBuf replace(DrillBuf old, int newSize);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index e4c7dd9..550cea2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -43,11 +43,11 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
*/
private ListeningExecutorService delegatePool;
- public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context) throws OutOfMemoryException {
this(popConfig, context, null);
}
- public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context, OperatorStats stats)
throws OutOfMemoryException {
super(context,
context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
@@ -74,12 +74,8 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
return;
}
logger.debug("Closing context for {}", popConfig != null ? getName() : null);
-
- try {
- super.close();
- } finally {
- closed = true;
- }
+ closed = true;
+ super.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index a38c3c2..e744f1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -35,8 +35,6 @@ import com.carrotsearch.hppc.procedures.IntLongProcedure;
import com.google.common.annotations.VisibleForTesting;
public class OperatorStats {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
-
protected final int operatorId;
protected final int operatorType;
private final BufferAllocator allocator;
@@ -61,7 +59,6 @@ public class OperatorStats {
private long setupMark;
private long waitMark;
-// private long schemas;
private int inputCount;
public OperatorStats(OpProfileDef def, BufferAllocator allocator){
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
new file mode 100644
index 0000000..a8dab2c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+public interface RootFragmentContext extends ExchangeFragmentContext {
+ FragmentStats getStats();
+
+ void setExecutorState(final ExecutorState executorState);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index 6752d76..04d29e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -36,7 +36,7 @@ public interface UdfUtilities {
// Map between injectable classes and their respective getter methods
// used for code generation
- public static final ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS =
+ ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS =
new ImmutableMap.Builder<Class<?>, String>()
.put(DrillBuf.class, "getManagedBuffer")
.put(PartitionExplorer.class, "getPartitionExplorer")
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index b4c8536..b93237e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -83,12 +82,6 @@ public abstract class AbstractBase implements PhysicalOperator {
return SelectionVectorMode.NONE;
}
- // Not available. Presumably because Drill does not currently use
- // this value, though it does appear in some test physical plans.
-// public void setInitialAllocation(long alloc) {
-// initialAllocation = alloc;
-// }
-
@Override
public long getInitialAllocation() {
return initialAllocation;
@@ -116,7 +109,6 @@ public abstract class AbstractBase implements PhysicalOperator {
@Override
public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
- /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
}
/**
@@ -126,9 +118,6 @@ public abstract class AbstractBase implements PhysicalOperator {
@Override @JsonIgnore
public boolean isBufferedOperator() { return false; }
- // @Override
- // public void setBufferedOperator(boolean bo) {}
-
@Override
public String getUserName() {
return userName;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index d8014f5..35c371c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -35,7 +35,7 @@ public interface Exchange extends PhysicalOperator {
* Assignment dependency describes whether sender fragments depend on receiver fragment's endpoint assignment for
* determining its parallelization and endpoint assignment and vice versa.
*/
- public enum ParallelizationDependency {
+ enum ParallelizationDependency {
SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving fragment for parallelization
RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending fragment for parallelization (default value).
}
@@ -46,7 +46,7 @@ public interface Exchange extends PhysicalOperator {
*
* @param senderLocations
*/
- public abstract void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException;
+ void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException;
/**
* Inform this Exchange node about its receiver locations. This list should be index-ordered the same as the expected
@@ -54,7 +54,7 @@ public interface Exchange extends PhysicalOperator {
*
* @param receiverLocations
*/
- public abstract void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException;
+ void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException;
/**
* Get the Sender associated with the given minorFragmentId. Cannot be called until after setupSenders() and
@@ -66,7 +66,7 @@ public interface Exchange extends PhysicalOperator {
* The feeding node for the requested sender.
* @return The materialized sender for the given arguments.
*/
- public abstract Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException;
+ Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException;
/**
* Get the Receiver associated with the given minorFragmentId. Cannot be called until after setupSenders() and
@@ -76,7 +76,7 @@ public interface Exchange extends PhysicalOperator {
* The minor fragment id, must be in the range [0, fragment.width).
* @return The materialized recevier for the given arguments.
*/
- public abstract Receiver getReceiver(int minorFragmentId);
+ Receiver getReceiver(int minorFragmentId);
/**
* Provide parallelization parameters for sender side of the exchange. Output includes min width,
@@ -86,7 +86,7 @@ public interface Exchange extends PhysicalOperator {
* @return
*/
@JsonIgnore
- public abstract ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints);
+ ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints);
/**
* Provide parallelization parameters for receiver side of the exchange. Output includes min width,
@@ -96,18 +96,18 @@ public interface Exchange extends PhysicalOperator {
* @return
*/
@JsonIgnore
- public abstract ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints);
+ ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints);
/**
* Return the feeding child of this operator node.
*
* @return
*/
- public PhysicalOperator getChild();
+ PhysicalOperator getChild();
/**
* Get the parallelization dependency of the Exchange.
*/
@JsonIgnore
- public ParallelizationDependency getParallelizationDependency();
+ ParallelizationDependency getParallelizationDependency();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 980f32c..7b76194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -47,7 +47,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
* @return
*/
@JsonIgnore
- public boolean isExecutable();
+ boolean isExecutable();
/**
* Describes the SelectionVector Mode for the output steam from this physical op.
@@ -55,7 +55,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
* @return
*/
@JsonIgnore
- public SelectionVectorMode getSVMode();
+ SelectionVectorMode getSVMode();
/**
* Provides capability to build a set of output based on traversing a query graph tree.
@@ -63,7 +63,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
* @param physicalVisitor
* @return
*/
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
+ <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
/**
* Regenerate with this node with a new set of children. This is used in the case of materialization or optimization.
@@ -71,44 +71,44 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
* @return
*/
@JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
+ PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
/**
* @return The memory to preallocate for this operator
*/
- public long getInitialAllocation();
+ long getInitialAllocation();
/**
* @return The maximum memory this operator can allocate
*/
- public long getMaxAllocation();
+ long getMaxAllocation();
/**
*
* @param maxAllocation The max memory allocation to be set
*/
- public void setMaxAllocation(long maxAllocation);
+ void setMaxAllocation(long maxAllocation);
/**
*
* @return True iff this operator manages its memory (including disk spilling)
*/
@JsonIgnore
- public boolean isBufferedOperator();
+ boolean isBufferedOperator();
// public void setBufferedOperator(boolean bo);
@JsonProperty("@id")
- public int getOperatorId();
+ int getOperatorId();
@JsonProperty("@id")
- public void setOperatorId(int id);
+ void setOperatorId(int id);
@JsonProperty("cost")
- public void setCost(double cost);
+ void setCost(double cost);
@JsonProperty("cost")
- public double getCost();
+ double getCost();
/**
* Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
@@ -116,8 +116,8 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
* @return
*/
@JsonProperty("userName")
- public String getUserName();
+ String getUserName();
@JsonIgnore
- public int getOperatorType();
+ int getOperatorType();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index d01e294..82887ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -21,11 +21,11 @@ import java.util.List;
import org.apache.drill.common.DeferredException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.OperatorUtilities;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -39,10 +39,10 @@ public abstract class BaseRootExec implements RootExec {
protected OperatorStats stats = null;
protected OperatorContext oContext = null;
- protected FragmentContext fragmentContext = null;
+ protected RootFragmentContext fragmentContext = null;
private List<CloseableRecordBatch> operators;
- public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = fragmentContext.newOperatorContext(config, stats);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -51,8 +51,8 @@ public abstract class BaseRootExec implements RootExec {
this.fragmentContext = fragmentContext;
}
- public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext,
- final PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext,
+ final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -87,7 +87,7 @@ public abstract class BaseRootExec implements RootExec {
public final boolean next() {
// Stats should have been initialized
assert stats != null;
- if (!fragmentContext.shouldContinue()) {
+ if (!fragmentContext.getExecutorState().shouldContinue()) {
return false;
}
try {
@@ -156,7 +156,7 @@ public abstract class BaseRootExec implements RootExec {
try {
df.close();
} catch (Exception e) {
- fragmentContext.fail(e);
+ fragmentContext.getExecutorState().fail(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index af99b5e..e9113ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,14 +20,12 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public interface BatchCreator<T extends PhysicalOperator> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
-
- public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children)
+ CloseableRecordBatch getBatch(ExecutorFragmentContext context, T config, List<RecordBatch> children)
throws ExecutionSetupException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index b418fd4..2e0d14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -66,7 +66,7 @@ public class ImplCreator {
* @return RootExec of fragment.
* @throws ExecutionSetupException
*/
- public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+ public static RootExec getExec(ExecutorFragmentContext context, FragmentRoot root) throws ExecutionSetupException {
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(context);
@@ -99,14 +99,14 @@ public class ImplCreator {
return rootExec;
} catch(Exception e) {
AutoCloseables.close(e, creator.getOperators());
- context.fail(e);
+ context.getExecutorState().fail(e);
}
return null;
}
/** Create RootExec and its children (RecordBatches) for given FragmentRoot */
- @SuppressWarnings("unchecked")
- private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+
+ private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException {
final List<RecordBatch> childRecordBatches = getChildren(root, context);
if (context.isImpersonationEnabled()) {
@@ -131,7 +131,7 @@ public class ImplCreator {
/** Create a RecordBatch and its children for given PhysicalOperator */
@VisibleForTesting
- public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(op);
final List<RecordBatch> childRecordBatches = getChildren(op, context);
@@ -164,9 +164,9 @@ public class ImplCreator {
}
/** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
- private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ private Object getOpCreator(PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
final Class<? extends PhysicalOperator> opClass = op.getClass();
- Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+ Object opCreator = context.getOperatorCreatorRegistry().getOperatorCreator(opClass);
if (opCreator == null) {
throw new UnsupportedOperationException(
String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
@@ -176,7 +176,7 @@ public class ImplCreator {
}
/** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
- private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ private List<RecordBatch> getChildren(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
List<RecordBatch> children = Lists.newArrayList();
for (PhysicalOperator child : op) {
children.add(getRecordBatch(child, context));
@@ -184,5 +184,4 @@ public class ImplCreator {
return children;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 690a662..0ef84b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -33,7 +33,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
@SuppressWarnings("resource")
@Override
- public MergingRecordBatch getBatch(FragmentContext context,
+ public MergingRecordBatch getBatch(ExecutorFragmentContext context,
MergingReceiverPOP receiver,
List<RecordBatch> children)
throws ExecutionSetupException, OutOfMemoryException {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
index f3d9524..17e4027 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.RecordBatch;
public interface RootCreator<T extends PhysicalOperator> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
-
- public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+ RootExec getRoot(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index e0d1545..f6a4863 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,14 +82,13 @@ public class ScanBatch implements CloseableRecordBatch {
private String currentReaderClassName;
/**
*
- * @param subScanConfig
* @param context
* @param oContext
* @param readerList
* @param implicitColumnList : either an emptylist when all the readers do not have implicit
* columns, or there is a one-to-one mapping between reader and implicitColumns.
*/
- public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+ public ScanBatch(FragmentContext context,
OperatorContext oContext, List<RecordReader> readerList,
List<Map<String, String>> implicitColumnList) {
this.context = context;
@@ -126,8 +125,7 @@ public class ScanBatch implements CloseableRecordBatch {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
List<RecordReader> readers)
throws ExecutionSetupException {
- this(subScanConfig, context,
- context.newOperatorContext(subScanConfig),
+ this(context, context.newOperatorContext(subScanConfig),
readers, Collections.<Map<String, String>> emptyList());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index d9abf40..46dc450 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingUserConnection;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -38,11 +39,10 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import com.google.common.base.Preconditions;
public class ScreenCreator implements RootCreator<Screen> {
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class);
@Override
- public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children)
+ public RootExec getRoot(ExecutorFragmentContext context, Screen config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkNotNull(children);
Preconditions.checkArgument(children.size() == 1);
@@ -52,7 +52,7 @@ public class ScreenCreator implements RootCreator<Screen> {
public static class ScreenRoot extends BaseRootExec {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming;
- private final FragmentContext context;
+ private final RootFragmentContext context;
private final AccountingUserConnection userConnection;
private RecordMaterializer materializer;
@@ -67,13 +67,17 @@ public class ScreenCreator implements RootCreator<Screen> {
}
}
- public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+ public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
super(context, config);
this.context = context;
this.incoming = incoming;
userConnection = context.getUserDataTunnel();
}
+ public RootFragmentContext getContext() {
+ return context;
+ }
+
@Override
public boolean innerNext() {
IterOutcome outcome = next(incoming);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2f33193..9231aef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
@@ -36,7 +37,7 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
- public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
+ public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return new SingleSenderRootExec(context, children.iterator().next(), config);
@@ -64,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
}
- public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+ public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
super(context, context.newOperatorContext(config, null), config);
this.incoming = batch;
assert incoming != null;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 7f9aca4..97e26b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -26,7 +26,6 @@ import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 442a753..1683286 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -281,7 +281,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
} catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
}
@@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
if (copier == null) {
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null);
} else {
for (VectorWrapper<?> i : batch) {
@@ -323,7 +323,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
selectionVector4.clear();
c.clear();
VectorContainer newQueue = new VectorContainer();
- builder.build(context, newQueue);
+ builder.build(newQueue);
priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
builder.getSv4().clear();
selectionVector4.clear();
@@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
throws SchemaChangeException, ClassTransformationException, IOException {
return createNewPriorityQueue(
- mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(),
+ mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(),
config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
}
@@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null);
@SuppressWarnings("resource")
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try {
@@ -434,7 +434,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
selectionVector4.clear();
c.clear();
final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
- builder.build(context, oldSchemaContainer);
+ builder.build(oldSchemaContainer);
oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
index e815bff..d477744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.TopN;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class TopNSortBatchCreator implements BatchCreator<TopN>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class);
-
+public class TopNSortBatchCreator implements BatchCreator<TopN> {
@Override
- public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children)
+ public TopNBatch getBatch(ExecutorFragmentContext context, TopN config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new TopNBatch(config, context, children.iterator().next());
}
-
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 0ea17d6..e98a7c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -118,7 +118,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
} catch(IOException ex) {
logger.error("Failure during query", ex);
kill(false);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
@@ -185,7 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
try {
recordWriter.cleanup();
} catch(IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
} finally {
try {
if (!processed) {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b3d68d3..47f1017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -186,7 +186,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
- context.fail(UserException.unsupportedError()
+ context.getExecutorState().fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged(
"Hash aggregate does not support schema change",
incomingSchema,
@@ -212,7 +212,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
this.aggregator = createAggregatorInternal();
return true;
} catch (SchemaChangeException | ClassTransformationException | IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
container.clear();
incoming.kill(false);
return false;
@@ -227,9 +227,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
ClassGenerator<HashAggregator> cg = top.getRoot();
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
top.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // top.saveCodeForDebugging(true);
-
container.clear();
int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
index 1397342..ed203e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class HashAggBatchCreator implements BatchCreator<HashAggregate>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class);
-
+public class HashAggBatchCreator implements BatchCreator<HashAggregate> {
@Override
- public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children)
+ public HashAggBatch getBatch(ExecutorFragmentContext context, HashAggregate config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new HashAggBatch(config, children.iterator().next(), context);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 89ba59b..ef8d9d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -300,7 +300,9 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@Override
- public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
+ RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
+ TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 16b5499..3384e67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -35,34 +35,35 @@ import org.apache.drill.exec.record.VectorContainer;
public interface HashAggregator {
- public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
+ TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
- public static enum AggOutcome {
+ enum AggOutcome {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
}
// For returning results from outputCurrentBatch
// OK - batch returned, NONE - end of data, RESTART - call again
- public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+ enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
- public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+ void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
- public abstract IterOutcome getOutcome();
+ IterOutcome getOutcome();
- public abstract int getOutputCount();
+ int getOutputCount();
- public abstract AggOutcome doWork();
+ AggOutcome doWork();
- public abstract void cleanup();
+ void cleanup();
- public abstract boolean allFlushed();
+ boolean allFlushed();
- public abstract boolean buildComplete();
+ boolean buildComplete();
- public abstract AggIterOutcome outputCurrentBatch();
+ AggIterOutcome outputCurrentBatch();
- public abstract boolean earlyOutput();
+ boolean earlyOutput();
- public abstract RecordBatch getNewIncoming();
+ RecordBatch getNewIncoming();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index ac4b29d..c473b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -45,19 +45,15 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
private int spilledBatches;
private FragmentContext context;
private BatchSchema schema;
- private OperatorContext oContext;
private SpillSet spillSet;
- // Path spillStreamPath;
private String spillFile;
VectorAccessibleSerializable vas;
- public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+ public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
this.context = context;
this.schema = schema;
this.spilledBatches = spilledBatches;
- this.oContext = oContext;
this.spillSet = spillSet;
- //this.spillStreamPath = spillStreamPath;
this.spillFile = spillFile;
vas = new VectorAccessibleSerializable(oContext.getAllocator());
container = vas.get();
@@ -126,7 +122,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
@Override
public IterOutcome next() {
- if ( ! context.shouldContinue() ) { return IterOutcome.STOP; }
+ if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
if ( spilledBatches <= 0 ) { // no more batches to read in this partition
this.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index b33dbd6..34ab97e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -202,7 +202,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
return outcome;
case UPDATE_AGGREGATOR:
- context.fail(UserException.unsupportedError()
+ context.getExecutorState().fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
.build(logger));
close();
@@ -263,7 +263,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
this.aggregator = createAggregatorInternal();
return true;
} catch (SchemaChangeException | ClassTransformationException | IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
container.clear();
incoming.kill(false);
return false;
@@ -275,8 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.getCodeGenerator().saveCodeForDebugging(true);
container.clear();
LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
index cac5b06..864271e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class);
-
+public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate> {
@Override
- public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children)
+ public StreamingAggBatch getBatch(ExecutorFragmentContext context, StreamingAggregate config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new StreamingAggBatch(config, children.iterator().next(), context);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 61c82d8..7e13eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
index 01122be..6c42a1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.RootCreator;
import org.apache.drill.exec.physical.impl.RootExec;
@@ -30,7 +30,7 @@ import com.google.common.collect.Iterators;
public class BroadcastSenderCreator implements RootCreator<BroadcastSender> {
@Override
- public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
+ public RootExec getRoot(ExecutorFragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
assert children != null && children.size() == 1;
return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 80d7744..bd4a1ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -57,7 +57,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
}
}
- public BroadcastSenderRootExec(FragmentContext context,
+ public BroadcastSenderRootExec(RootFragmentContext context,
RecordBatch incoming,
BroadcastSender config) throws OutOfMemoryException {
super(context, context.newOperatorContext(config, null), config);
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 9bab67d..703868e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -225,12 +225,11 @@ public class ChainedHashTable {
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
HashTable ht = context.getImplementationClass(top);
- ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
+ ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
return ht;
}
-
private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
throws SchemaChangeException {
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 3749e3e..d28fe49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,64 +20,58 @@ package org.apache.drill.exec.physical.impl.common;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
public interface HashTable {
-
- public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
- new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class);
-
- /**
- * The initial default capacity of the hash table (in terms of number of buckets).
- */
- static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16;
+ TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
+ new TemplateClassDefinition<>(HashTable.class, HashTableTemplate.class);
/**
* The maximum capacity of the hash table (in terms of number of buckets).
*/
- static final public int MAXIMUM_CAPACITY = 1 << 30;
+ int MAXIMUM_CAPACITY = 1 << 30;
/**
* The default load factor of a hash table.
*/
- static final public float DEFAULT_LOAD_FACTOR = 0.75f;
+ float DEFAULT_LOAD_FACTOR = 0.75f;
- static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
+ enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
/**
* The batch size used for internal batch holders
*/
- static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
- static final public int BATCH_MASK = 0x0000FFFF;
+ int BATCH_SIZE = Character.MAX_VALUE + 1;
+ int BATCH_MASK = 0x0000FFFF;
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
+ void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
+ VectorContainer htContainerOrig);
- public void updateBatches() throws SchemaChangeException;
+ void updateBatches() throws SchemaChangeException;
- public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
+ int getHashCode(int incomingRowIdx) throws SchemaChangeException;
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
+ PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
- public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
+ int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
- public void getStats(HashTableStats stats);
+ void getStats(HashTableStats stats);
- public int size();
+ int size();
- public boolean isEmpty();
+ boolean isEmpty();
- public void clear();
+ void clear();
- public void reinit(RecordBatch newIncoming);
+ void reinit(RecordBatch newIncoming);
- public void reset();
+ void reset();
- public void setMaxVarcharSize(int size);
+ void setMaxVarcharSize(int size);
- public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+ boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 6cbbdcb..272d782 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
@@ -48,7 +47,6 @@ public abstract class HashTableTemplate implements HashTable {
private static final boolean EXTRA_DEBUG = false;
private static final int EMPTY_SLOT = -1;
- // private final int MISSING_VALUE = 65544;
// A hash 'bucket' consists of the start index to indicate start of a hash chain
@@ -78,8 +76,6 @@ public abstract class HashTableTemplate implements HashTable {
// Placeholder for the current index while probing the hash table
private IndexPointer currentIdxHolder;
-// private FragmentContext context;
-
private BufferAllocator allocator;
// The incoming build side record batch
@@ -451,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable {
@Override
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+ public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
@@ -470,7 +466,6 @@ public abstract class HashTableTemplate implements HashTable {
}
this.htConfig = htConfig;
-// this.context = context;
this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;