You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by sohami <gi...@git.apache.org> on 2017/11/01 00:14:59 UTC
[GitHub] drill pull request #978: DRILL-5842: Refactor and simplify the fragment, ope...
Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/978#discussion_r148095543
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorExecContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. This class provides services common to the test-time
+ * version of the operator context and the full production-time context
+ * that includes network services.
+ */
+
+public abstract class BaseOperatorContext implements OperatorContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
+
+ protected final FragmentContextInterface context;
+ protected final BufferAllocator allocator;
+ protected final PhysicalOperator popConfig;
+ protected final BufferManager manager;
+ protected OperatorStatReceiver statsWriter;
+ private DrillFileSystem fs;
+ private ControlsInjector injector;
+
+ public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator,
+ PhysicalOperator popConfig,
+ OperatorStatReceiver stats) {
+ this.context = context;
+ this.allocator = allocator;
+ this.popConfig = popConfig;
+ this.manager = new BufferManagerImpl(allocator);
+ statsWriter = stats;
+ }
+
+ @Override
+ public FragmentContextInterface getFragmentContext() {
+ return context;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends PhysicalOperator> T getOperatorDefn() {
+ return (T) popConfig;
+ }
+
+ public String getName() {
+ return popConfig.getClass().getName();
+ }
+
+ @Override
+ public DrillBuf replace(DrillBuf old, int newSize) {
+ return manager.replace(old, newSize);
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer() {
+ return manager.getManagedBuffer();
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer(int size) {
+ return manager.getManagedBuffer(size);
+ }
+
+ @Override
+ public ExecutionControls getExecutionControls() {
+ return context.getExecutionControls();
+ }
+
+ @Override
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException("Operator context does not have an allocator");
+ }
+ return allocator;
+ }
+
+ // Allow an operator to use the thread pool
+ @Override
+ public ExecutorService getExecutor() {
+ return context.getDrillbitContext().getExecutor();
+ }
+
+ @Override
+ public ExecutorService getScanExecutor() {
+ return context.getDrillbitContext().getScanExecutor();
+ }
+
+ @Override
+ public ExecutorService getScanDecodeExecutor() {
+ return context.getDrillbitContext().getScanDecodeExecutor();
+ }
+
+ @Override
+ public void setInjector(ControlsInjector injector) {
+ this.injector = injector;
+ }
+
+ @Override
+ public ControlsInjector getInjector() {
+ return injector;
+ }
+
+ @Override
+ public void injectUnchecked(String desc) {
+ ExecutionControls executionControls = context.getExecutionControls();
+ if (injector != null && executionControls != null) {
+ injector.injectUnchecked(executionControls, desc);
+ }
+ }
+
+ @Override
+ public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass)
+ throws T {
+ ExecutionControls executionControls = context.getExecutionControls();
+ if (injector != null && executionControls != null) {
+ injector.injectChecked(executionControls, desc, exceptionClass);
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ manager.close();
+ } catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ if (allocator != null) {
+ allocator.close();
+ }
+ } catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ try {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .addContext("Failed to close the Drill file system for " + getName())
--- End diff --
how about adding context for the `exception ex` as well if it's not null
---