You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ilooner <gi...@git.apache.org> on 2017/11/21 23:07:50 UTC

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

GitHub user ilooner opened a pull request:

    https://github.com/apache/drill/pull/1045

    DRILL-5730 Test Mocking Improvements

    ## DRILL-5730
    
    - Switched to using the interface for FragmentContext everywhere instead of passing around the concrete class.
    - Minor refactoring of FragmentContext public methods
    - Switched to using the OptionSet interface throughout the codebase instead of OptionManager
    - Renamed **FragmentContext** to **FragmentContextImpl** and renamed **FragmentContextInterface** to **FragmentContext**.
    - Removed JMockit from most unit tests in favor of Mockito. Unfortunately it cannot be removed some some of the unit tests which depend on it for mocking private method and static methods (which is functionality only JMockit provides). In the future we need to refactor the code so that these remaining tests can have JMockit removed completely.
    - Refactored some tests to use a mock class of FragmentContext
    - Some tests were using Mockito and JMockit when there was no need for a mocking framework
    
    ## Misc
    
    - Removed commented out code and unused imports
    - Removed unnecessary modifiers from methods in interfaces
    - Fixed a bug in bootstrapcontext which leaked threads
    - Fixed javadoc links that were broken


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ilooner/drill DRILL-5730

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1045.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1045
    
----
commit b4b4de83db5df20f2fa56387f5756df0ead3ec17
Author: Paul Rogers <pr...@maprtech.com>
Date:   2017-10-05T05:43:44Z

    DRILL-5842: Refactor fragment, operator contexts

commit 0a2d938cee7d5d47d3ac0d666ace8163efb3af83
Author: Paul Rogers <pr...@maprtech.com>
Date:   2017-10-06T06:24:56Z

    Fixes for tests which mock contexts

commit 34cd7494c68f0934fdf5f455748863be873b3995
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-16T18:28:54Z

     - Removed commented out code
     - Removed redundant modifiers on interface methods

commit a4944b20abe226a990adc775a3641b44c0b173bb
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-16T19:36:23Z

     - Some more minor code cleanup

commit 13f35109a30f03414223c84f4f4fb664ab344e6e
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-17T19:30:59Z

     - Deleted commented out code
     - Removed unused variables
     - Replaced usage of FragmentContext with FragmentContextInterface
     - Refactored OptionSet and FragmentContextInterface interfaces

commit 629da8ff3bd40b3269747cf54a88754da3266346
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-18T19:37:37Z

     - More changes to the FragmentContextInterface
     - Replaced more usages of FragmentContext with FragmentContextInterface
     - Replaced usages of OptionManager with OptionSet

commit 71f9a1c7d2c8b2f60398348d57344c56a68f556c
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-18T19:52:01Z

     - Removed unused import

commit b189350a20e3527d8b6c7df82fdb8641a359dad4
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-10-18T22:21:52Z

     - Fixed broken unit tests

commit 27f88376c7ad5da384570de0a3eafeb16393829d
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-07T19:02:43Z

     - Deleted unused fields

commit 5f3e3ce93aba98e2e20abd0a187392d38a78c374
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-09T02:48:44Z

     - Removed unused variables
     - Removed use of Jmockit from unit tests
     - Minor code cleanup

commit df4b0c1fed0f2d34292e6e635635cee4c6f2f2af
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-09T21:42:00Z

     - Fixed java-exec build and test errors

commit 8113edb320f2ff12e26bd87f82b23fd47a9513cd
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-16T00:46:34Z

     - Fixed broken test
     - Removed broken TestOptiqPlans
     - Completed replacing references to FragmentContext with FragmentContextInterface

commit b1fee4ff6e5c6dde14239732baae193ea752f21e
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-16T00:58:45Z

     - Moved TestHashJoin off of JMockit

commit f94a115eddfbbae8db861e6e97481197c9100f6c
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-16T20:52:03Z

     - Removed more dependencies on JMockit

commit d2178e262885f073cbb0d3daab1640a36f505805
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-20T20:28:29Z

     - Finished migrating most of the tests off of JMockit
     - Tests pass

commit 58ca0d1d42b931509dcff013346f2374556d5080
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-20T23:41:41Z

     - Fixed a test bug

commit c176d36625b115e7990db0bc3fbaa3e1f5dcb2f7
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-21T00:52:09Z

     - Fixed compilation issue

commit b3d13d937e56dc82ec35f6e784ec8111e7a9e4de
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-21T18:17:59Z

     - Fixed broken tests

commit 18bb70e6bb0ad8c5b87b8f6ec40e121240525d04
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-21T19:36:49Z

     - Code cleanup to prep for PR

commit f927a7b9d0540bae29e2b3fee632c7464241714b
Author: Timothy Farkas <ti...@apache.org>
Date:   2017-11-21T21:21:17Z

     - Renamed FragmentContext FragmentContextImpl and renamed FragmentContextInterface FragmentContext

----


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156534310
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
    --- End diff --
    
    Moved to ExchangeFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156208221
  
    --- Diff: common/src/main/java/org/apache/drill/common/config/DrillConfig.java ---
    @@ -45,13 +45,11 @@
     
       private final ImmutableList<String> startupArguments;
     
    -  public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X");
    -
       @SuppressWarnings("restriction")
       private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
     
       @VisibleForTesting
    -  public DrillConfig(Config config, boolean enableServerConfigs) {
    +  public DrillConfig(Config config) {
    --- End diff --
    
    The flag is actually an unused argument. Maybe it did something at one point but the code that used it is long gone.


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    Your right. I'll mark it as a duplicate.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192365
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if everything is okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw authorization errors to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
    --- End diff --
    
    Above three are network related.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162246538
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---
    @@ -184,7 +184,7 @@ public RawFragmentBatch getNext() throws IOException {
           return null;
         }
     
    -    if (context.isOverMemoryLimit()) {
    +    if (context.getAllocator().isOverLimit()) {
    --- End diff --
    
    This one is fine. We must expose an allocator even for testing.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162251468
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -351,8 +603,110 @@ public OperatorStats getStats() {
         }
       }
     
    -  public OperatorContext operatorContext(PhysicalOperator config) {
    -    return new TestOperatorContext(context, allocator(), config, stats);
    +  public static class MockPhysicalOperator implements PhysicalOperator {
    --- End diff --
    
    Seems overkill. Most times, a test that needs an operator will provide one. The dummy one that was in this code (or maybe in my latest PR) handles those cases where we don't even have an operator (record batch) so any operator (definition) will do.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162250517
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java ---
    @@ -21,18 +21,14 @@
     
     import org.apache.drill.exec.ExecTest;
     import org.apache.drill.exec.proto.CoordinationProtos;
    -import org.apache.drill.exec.store.parquet.ParquetGroupScan;
     import org.apache.hadoop.fs.BlockLocation;
     import org.junit.Test;
     
     import com.google.common.collect.ImmutableRangeMap;
     import com.google.common.collect.Range;
     
     public class TestAffinityCalculator extends ExecTest {
    -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAffinityCalculator.class);
    -
    -  String port = "1234";
    -  final String path = "path";
    +  private final String port = "1234";
    --- End diff --
    
    What happened to the blocks of code that were removed? Not used? Duplicate? Or, does the unused code suggest that this test is not actually testing what it should?


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156534345
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
    --- End diff --
    
    Moved to ExchangeFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156181381
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -39,27 +39,29 @@
      * version of the operator context and the full production-time context
      * that includes network services.
      */
    -
     public abstract class BaseOperatorContext implements OperatorContext {
       static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
     
    -  protected final FragmentContextInterface context;
    +  protected final FragmentContext context;
       protected final BufferAllocator allocator;
       protected final PhysicalOperator popConfig;
       protected final BufferManager manager;
    +  protected OperatorStatReceiver statsWriter;
    --- End diff --
    
    Didn't I deprecate this in a recent commit? Turned out to be more trouble than it was worth. The original class could be made to work in a test environment.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156191611
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
    --- End diff --
    
    OK to add as long as the registry can be created without ZK and network dependencies. (UDF introduced such dependencies, but, I believe, they can be turned off.)


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156178698
  
    --- Diff: common/src/main/java/org/apache/drill/common/config/DrillConfig.java ---
    @@ -45,13 +45,11 @@
     
       private final ImmutableList<String> startupArguments;
     
    -  public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X");
    -
       @SuppressWarnings("restriction")
       private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
     
       @VisibleForTesting
    -  public DrillConfig(Config config, boolean enableServerConfigs) {
    +  public DrillConfig(Config config) {
    --- End diff --
    
    I'm going to guess that the same `DrillConfig` is used (sadly) in the Drill client and Drill server. (Sadly because it is odd for a client application to need to provide a config file in order to connect to a Drill server. Just adds unnecessary complexity, especially for third-party apps that happen to include a Drill JDBC driver. Sigh...)
    
    In the client mode, we don't want to load all the internal server configs because we'll never use them.
    
    Not sure if this is actually how this flag has been used (maybe it was added, then forgotten.) But, it is worth checking the client code to see if it was ever used.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162513655
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java ---
    @@ -288,7 +288,7 @@ public static void populateVector(final ValueVector vector, final DrillBuf manag
         }
       }
     
    -  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionManager options) {
    +  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
    --- End diff --
    
    I'll flip to using the OptionManager.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162246482
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---
    @@ -172,7 +172,7 @@ public RawFragmentBatch getNext() throws IOException {
         } catch (final InterruptedException e) {
     
           // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
    -      if (!context.shouldContinue()) {
    +      if (!context.getExecutorState().shouldContinue()) {
    --- End diff --
    
    Let's think about this from a testing perspective. The context context can be implemented in a way that allows testing. That is what the use of an interface allows. Tests implement the context one way, real code another way, but the interface is the same.
    
    Here, we expose the executor state. Is this also defined as an interface?
    
    If not, we are letting implementation leak out, and we won't be able to create a test-only version. It was better to leave these methods on the context itself so that their implementation is hidden and thus we can create test versions.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156187558
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    --- End diff --
    
    Github is very confused by this double rename. I presume the new file just has the same contents, but a new name? Used an online diff checker to compare the two interfaces. Turns out there are important differences. Comments below.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156490496
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
    --- End diff --
    
    This is required since it's used by operators like join, hashagg, filter, and flatten. I've replaced FunctionImplementationRegistry with the FunctionLookupContext interface now. So it has no dependence on ZK or networking. 


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156463711
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
    --- End diff --
    
    Thanks for the explanation. Sounds very reasonable to me. I will work on splitting things up into 3 interfaces. The FragmentContextImpl class could probably implement all three interfaces and be cast to the appropriate interface depending on where it is passed to.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162251030
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -86,37 +117,35 @@
      * <li>Multiple threads of execution.</li>
      * </ul>
      */
    -
     public class OperatorFixture extends BaseFixture implements AutoCloseable {
     
    +  public OperatorContext operatorContext(PhysicalOperator config) {
    +    return new MockOperatorContext(context, allocator(), config);
    +  }
    +
       /**
        * Builds an operator fixture based on a set of config options and system/session
        * options.
        */
    -
    -  public static class OperatorFixtureBuilder
    +  public static class Builder
    --- End diff --
    
    Your PR is accidentally undoing a name change from a prior commit. Please check if we have any other collisions in this or related files.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156459436
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -39,27 +39,29 @@
      * version of the operator context and the full production-time context
      * that includes network services.
      */
    -
     public abstract class BaseOperatorContext implements OperatorContext {
       static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
     
    -  protected final FragmentContextInterface context;
    +  protected final FragmentContext context;
       protected final BufferAllocator allocator;
       protected final PhysicalOperator popConfig;
       protected final BufferManager manager;
    +  protected OperatorStatReceiver statsWriter;
    --- End diff --
    
    Thanks for catching this. I think it slipped in when I was rebasing an resolving conflicts. I will remove it.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156534237
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
    --- End diff --
    
    Changed again. Moved this into ExecutorFragmentContext.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192043
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
    --- End diff --
    
    Introduces dependencies on cluster coordination; makes it hard to test operators that operate "below" the level of the cluster.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r163449000
  
    --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java ---
    @@ -66,13 +66,12 @@
       public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
           FragmentContext context, KafkaStoragePlugin plugin) {
         setColumns(projectedColumns);
    -    this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    -    this.readNumbersAsDouble = context.getOptions()
    -        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    OptionManager options = context.getOptions();
    -    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
    -    this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    -    this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    +    final OptionSet optionSet = context.getOptionSet();
    +    this.enableAllTextMode = optionSet.getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    +    this.readNumbersAsDouble = optionSet.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    this.unionEnabled = optionSet.getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.kafkaMsgReader = optionSet.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    +    this.kafkaPollTimeOut = optionSet.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    --- End diff --
    
    Would be a good code cleanup task to control entropy. However, this is the kind of thing that can be done incrementally over time as the need arises.
    
    Also, standardizing on session/system option names would be good: maybe `_KEY` for names, `_VALIDATOR` for validators.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156494989
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
    --- End diff --
    
    Removed


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162501868
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java ---
    @@ -59,7 +59,7 @@
     @Category(OperatorTest.class)
     public class TestSorter extends DrillTest {
     
    -  public static OperatorFixture fixture;
    +  public volatile static OperatorFixture fixture;
    --- End diff --
    
    I was running into an issue where the fixture variable was consistently null when the test ran. This seemed impossible and I hypothesized that JUnit is secretly using two threads, one to call the static initializers and then another to actually execute the test methods. So I made the variable volatile and the issue went away. I agree this is weird and it raises the question why other tests don't have the same issue. I'll remove the volatile for now, if the bizarre issue surfaces again, I will get more details and post them here.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156538150
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    --- End diff --
    
    Moved to RootFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156245600
  
    --- Diff: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---
    @@ -144,13 +143,12 @@ private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
         }
         MongoClient client = plugin.getClient(addresses);
         MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
    -    this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.unionEnabled = fragmentContext.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    done


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162495701
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -351,8 +603,110 @@ public OperatorStats getStats() {
         }
       }
     
    -  public OperatorContext operatorContext(PhysicalOperator config) {
    -    return new TestOperatorContext(context, allocator(), config, stats);
    +  public static class MockPhysicalOperator implements PhysicalOperator {
    --- End diff --
    
    Thanks for catching this. This class is left over from some previous work and is not used. I've deleted it.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162517423
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java ---
    @@ -59,7 +59,7 @@
     @Category(OperatorTest.class)
     public class TestSorter extends DrillTest {
     
    -  public static OperatorFixture fixture;
    +  public volatile static OperatorFixture fixture;
    --- End diff --
    
    Strange... So, if having volatile does solve a problem, just leave it in, but maybe add a comment that includes you note above so that we know what's what in the future.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156180593
  
    --- Diff: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---
    @@ -91,9 +90,9 @@ public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaP
             subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
     
         buildFilters(subScanSpec.getFilter(), mergedFilters);
    -    enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
    -    readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
    +    enableAllTextMode = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
    +    readNumbersAsDouble = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    isBsonRecordReader = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
    --- End diff --
    
    More to replace.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192227
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
    --- End diff --
    
    Network related.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162512220
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -58,23 +55,24 @@
     import com.google.common.collect.ImmutableList;
     import com.google.common.io.Resources;
     
    -public class PlanningBase extends ExecTest{
    -  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    --- End diff --
    
    This test was previously using JMockit and I have simply replaced it with Mockito which is Eclipse friendly and a step forward for developers using Eclipse. So the amount of Mocking going on here is the same :) the only difference is that Eclipse users should now be able to run these tests. I have also completely removed mocking from some tests which no longer need it. As we take more steps to properly use interfaces for more classes, we can incrementally remove Mockito from even more tests. However, this is an incremental process and shouldn't be done all in one shot.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r163419338
  
    --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java ---
    @@ -66,13 +66,12 @@
       public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
           FragmentContext context, KafkaStoragePlugin plugin) {
         setColumns(projectedColumns);
    -    this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    -    this.readNumbersAsDouble = context.getOptions()
    -        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    OptionManager options = context.getOptions();
    -    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
    -    this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    -    this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    +    final OptionSet optionSet = context.getOptionSet();
    +    this.enableAllTextMode = optionSet.getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    +    this.readNumbersAsDouble = optionSet.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    this.unionEnabled = optionSet.getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.kafkaMsgReader = optionSet.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    +    this.kafkaPollTimeOut = optionSet.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    --- End diff --
    
    Should we "complete" the cleanup - go over the code and convert all the getOption(..) to the getTYPE(..) ones ?  Also for standardizing -- should all the name definitions, like
    {code}
    public static final String ENABLE_UNION_TYPE_KEY = "exec.enable_union_type";
    {code}
    be changed as needed to have the suffix  **_KEY** ?
     


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156243416
  
    --- Diff: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---
    @@ -117,7 +116,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
     
         disableCountOptimization = formatPluginConfig.disableCountOptimization();
         setColumns(projectedColumns);
    -    unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    unionEnabled = context.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    Done


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156460769
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -39,27 +39,29 @@
      * version of the operator context and the full production-time context
      * that includes network services.
      */
    -
     public abstract class BaseOperatorContext implements OperatorContext {
       static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
     
    -  protected final FragmentContextInterface context;
    +  protected final FragmentContext context;
       protected final BufferAllocator allocator;
       protected final PhysicalOperator popConfig;
       protected final BufferManager manager;
    +  protected OperatorStatReceiver statsWriter;
       private DrillFileSystem fs;
       private ControlsInjector injector;
     
    -  public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator,
    -               PhysicalOperator popConfig) {
    +  public BaseOperatorContext(FragmentContext context, BufferAllocator allocator,
    +                             PhysicalOperator popConfig,
    +                             OperatorStatReceiver stats) {
         this.context = context;
         this.allocator = allocator;
         this.popConfig = popConfig;
         this.manager = new BufferManagerImpl(allocator);
    +    this.statsWriter = stats;
    --- End diff --
    
    done


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156494301
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
    --- End diff --
    
    Yeah your right, it's only used by the FragmentExecutor. Removed it from the interface.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156193025
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if everything is okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw authorization errors to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
     
    -  public IncomingBuffers getBuffers() {
    -    return buffers;
    -  }
    +  Controller getController();
     
    -  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
    -      throws OutOfMemoryException {
    -    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
    -    contexts.add(context);
    -    return context;
    -  }
    +  OperatorCreatorRegistry getOperatorCreatorRegistry();
    --- End diff --
    
    Needed only when creating operators, not when executing a fragment. So, network/cluster related.
    
    Rather than continue to tag each item; please review the original interface class where I did this analysis to decide which methods should be in the runtime interface and which are network/cluster/server related (and so I left them in the implementation class.)
    
    Since we have no design doc, perhaps we can at least document these decisions and concepts in the class header comments. (I suppose I should have done that; rereading the comment it clearly lacks this level of detail.)


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162512891
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---
    @@ -172,7 +172,7 @@ public RawFragmentBatch getNext() throws IOException {
         } catch (final InterruptedException e) {
     
           // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
    -      if (!context.shouldContinue()) {
    +      if (!context.getExecutorState().shouldContinue()) {
    --- End diff --
    
    ExecutorState is an interface. I removed those methods from the FragmentContext interface because it felt redundant to replicate the ExecutorState interface methods in the FragmentContext interface.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162250110
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java ---
    @@ -59,7 +59,7 @@
     @Category(OperatorTest.class)
     public class TestSorter extends DrillTest {
     
    -  public static OperatorFixture fixture;
    +  public volatile static OperatorFixture fixture;
    --- End diff --
    
    Why must this be volatile? This test runs in only one thread. The allocator that the fixture provides works in only one thread...


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    Addressed review comments. Also in an effort to accelerate the review I removed Misc code changes like the deletion of unused code and variables from a few dozen classes. I will open a separate PR with those Misc code improvements.
    
    @Ben-Zvi Please take a look.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156244298
  
    --- Diff: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java ---
    @@ -33,10 +32,10 @@
     public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
       @Override
       public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
    -      List<RecordBatch> children) throws ExecutionSetupException {
    +                            List<RecordBatch> children) throws ExecutionSetupException {
         Preconditions.checkArgument(children.isEmpty());
         JdbcStoragePlugin plugin = config.getPlugin();
    -    RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
    +    RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName());
    --- End diff --
    
    Which file did I remove the POP config from? I can't seem to find it.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156176534
  
    --- Diff: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java ---
    @@ -33,10 +32,10 @@
     public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
       @Override
       public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
    -      List<RecordBatch> children) throws ExecutionSetupException {
    +                            List<RecordBatch> children) throws ExecutionSetupException {
         Preconditions.checkArgument(children.isEmpty());
         JdbcStoragePlugin plugin = config.getPlugin();
    -    RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
    +    RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName());
    --- End diff --
    
    For readers that don't need the fragment context, it is certainly helpful, for unit testing, to omit it. However, operator context seems fair game. The operator context provides the Drill file system, the physical operator definition (the "config" here) and so on. Unfortunately, it is created inside the `ScanBatch`, to late to pass to the reader constructor.
    
    The new scan framework from the "batch size" project addresses these issues; but only for those few readers that we choose to upgrade; most readers will continue to use the changes you are making here.
    
    One other point: in an earlier file, you removed the physical operator definition (AKA "POP config") from the `ScanBatch` constructor. Here you leave it. Will this cause unexpected modes in which sometimes the operator definition is available, other times not?


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    @paul-rogers 


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162229362
  
    --- Diff: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java ---
    @@ -288,7 +288,7 @@ public static void populateVector(final ValueVector vector, final DrillBuf manag
         }
       }
     
    -  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionManager options) {
    +  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
    --- End diff --
    
    Note that DRILL-6049 removes OptionSet in favor of OptionManager. (Thanks to the work you and Jyothsna did, we can now use that class in tests.)


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156245506
  
    --- Diff: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---
    @@ -91,9 +90,9 @@ public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaP
             subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
     
         buildFilters(subScanSpec.getFilter(), mergedFilters);
    -    enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
    -    readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
    +    enableAllTextMode = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
    +    readNumbersAsDouble = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    isBsonRecordReader = fragmentContext.getOptionSet().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
    --- End diff --
    
    done


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    @paul-rogers 
    
    I have split the FragmentContext into Four interfaces. We discussed 3, but it felt like 4 was more appropriate since some methods didn't fit well in some of the contexts.
    
    - **ExecutorFragmentContext**: This is the highest level context that exposes access to everything. This is used by the FragmentExecutor and is also passed to the BatchCreators and RootCreators. 
    - **RootFragmentContext**: This is exposed to the root executors. It has a few methods only needed by the root, but doesn't have access to everything like the ExecutorFragmentContext.
    - **ExchangeFragmentContext**: This contains the network functions, and is used by the senders and recievers.
    - **FragmentContext**: This is the barebones context which has no network functions and only exposes the bare minimum needed by operators.
    
    These contexts are interfaces that inherit each other in the following way:
    
    ExecutorFragmentContext -> RootFragmentContext -> ExchangeFragmentContext -> FragmentContext
    
    There is only one implementation of the interfaces **FragmentContextImpl**. The facade pattern is used to cast the context to the appropriate interface depending on where it's used.
    
    Let me know if you'd like to change the hierarchy or organization. Once we finalize how we want to organize things I will add javadoc.
    



---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156177751
  
    --- Diff: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---
    @@ -117,7 +116,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
     
         disableCountOptimization = formatPluginConfig.disableCountOptimization();
         setColumns(projectedColumns);
    -    unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    unionEnabled = context.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    If you are cleaning up this code, consider using a newly added feature of the option manager, the ability to get the typed result. Rather than:
    
    ```
      public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator(ENABLE_UNION_TYPE_KEY);
    ...
    unionEnabled = context.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    ```
    
    We can now do:
    ```
      public static final String ENABLE_UNION_TYPE_KEY = "exec.enable_union_type";
    ...
    unionEnabled = context.getOptionSet().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
    ```
    
    This method handles value lookup, but also enforces types. (The existing code will just trigger an NPE if types don't match.) This new form also hides the details of the option validator as recent changes have ensured that the validators are already visible to the option manager internals.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156186475
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -161,16 +163,13 @@ public void close() {
         }
         try {
           if (fs != null) {
    -        fs.close();
    -        fs = null;
    +          fs.close();
    --- End diff --
    
    Don't we like two space indentations?


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    LGTM - +1 .
    Does the new code also fixes DRILL-5170 ? If so, it should be changed to a duplicate of 5730.



---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192175
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
    --- End diff --
    
    Requires tests mock up a network connection just to test, say, sort or project which don't care about network. Move to the "network/cluster" version of this interface.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162246792
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -58,23 +55,24 @@
     import com.google.common.collect.ImmutableList;
     import com.google.common.io.Resources;
     
    -public class PlanningBase extends ExecTest{
    -  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    --- End diff --
    
    Suggestion. Do not depend on mocking in test. Design code so it works well in tests without mocks. Resort to mocks only when absolutely necessary. That is, we should be trying to do LESS mocking, not MORE.
    
    This way, the same test code can be used in tools, ad-hoc tests and so on without the JUnit machinery.
    
    This is a philosophical discussion, and one that should have input from more of the team.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156534369
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if everything is okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw authorization errors to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
    --- End diff --
    
    Moved to ExchangeFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162512429
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -58,23 +55,24 @@
     import com.google.common.collect.ImmutableList;
     import com.google.common.io.Resources;
     
    -public class PlanningBase extends ExecTest{
    -  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    --- End diff --
    
    https://issues.apache.org/jira/browse/DRILL-6097 will be the next step forward to removing our dependence on Mocking libraries :).


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by priteshm <gi...@git.apache.org>.
Github user priteshm commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    @arina-ielchiieva - any chance this can be committed with the batch commits tomorrow? otherwise, @ilooner would need to rebase again!


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156180422
  
    --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java ---
    @@ -66,13 +66,12 @@
       public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
           FragmentContext context, KafkaStoragePlugin plugin) {
         setColumns(projectedColumns);
    -    this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    -    this.readNumbersAsDouble = context.getOptions()
    -        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    OptionManager options = context.getOptions();
    -    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
    -    this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    -    this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    +    final OptionSet optionSet = context.getOptionSet();
    +    this.enableAllTextMode = optionSet.getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    +    this.readNumbersAsDouble = optionSet.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    this.unionEnabled = optionSet.getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.kafkaMsgReader = optionSet.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    +    this.kafkaPollTimeOut = optionSet.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    --- End diff --
    
    See above note. Replace:
    ```
        this.kafkaPollTimeOut = optionSet.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    ```
    
    With
    ```
        kafkaPollTimeOut = optionSet.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
    ```
    
    And so on for the other options.
    
    As it is, the above will cause an NPE if ever the option types don't match what the code expects.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1045


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156180661
  
    --- Diff: contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---
    @@ -144,13 +143,12 @@ private void init(MongoSubScan.MongoSubScanSpec subScanSpec) {
         }
         MongoClient client = plugin.getClient(addresses);
         MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
    -    this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.unionEnabled = fragmentContext.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    Another one


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162229669
  
    --- Diff: contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java ---
    @@ -33,10 +32,10 @@
     public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
       @Override
       public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
    -      List<RecordBatch> children) throws ExecutionSetupException {
    +                            List<RecordBatch> children) throws ExecutionSetupException {
         Preconditions.checkArgument(children.isEmpty());
         JdbcStoragePlugin plugin = config.getPlugin();
    -    RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
    +    RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName());
    --- End diff --
    
    Now I can't find the file either. Consider this issue resolved.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162251160
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -127,20 +156,26 @@ public OperatorFixture build() {
        * uses the same code generation mechanism as the full Drill, but
        * provide test-specific versions of various other services.
        */
    -
    -  public static class TestFragmentContext extends BaseFragmentContext {
    -
    +  public static class MockFragmentContext extends BaseFragmentContext {
    --- End diff --
    
    More collisions? Can't image you made this many changes here...


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156191898
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
    --- End diff --
    
    Only in network version. Actually, probably should not be part of the fragment context itself, but rather part of the mechanism that boots up a fragment. (Once the fragment is running, we won't read the plan again.)


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156538123
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if everything is okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw authorization errors to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
     
    -  public IncomingBuffers getBuffers() {
    -    return buffers;
    -  }
    +  Controller getController();
     
    -  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
    -      throws OutOfMemoryException {
    -    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
    -    contexts.add(context);
    -    return context;
    -  }
    +  OperatorCreatorRegistry getOperatorCreatorRegistry();
    --- End diff --
    
    Moved to ExecutorFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162251307
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -175,19 +210,189 @@ public DrillConfig getConfig() {
         }
     
         @Override
    -    public DrillbitContext getDrillbitContext() {
    -      throw new UnsupportedOperationException("Drillbit context not available for operator unit tests");
    +    public ExecutorService getScanDecodeExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorService getScanExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorService getExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorState getExecutorState() {
    +      return executorState;
    +    }
    +
    +    @Override
    +    public BufferAllocator getNewChildAllocator(String operatorName, int operatorId,
    +                                                long initialReservation, long maximumReservation) {
    +      return allocator.newChildAllocator(
    +        "op:" + operatorId + ":" + operatorName,
    +        initialReservation,
    +        maximumReservation);
         }
     
         @Override
    -    protected CodeCompiler getCompiler() {
    +    public ExecProtos.FragmentHandle getHandle() {
    +      return ExecProtos.FragmentHandle.newBuilder().build();
    +    }
    +
    +    @Override
    +    public BufferAllocator getAllocator() {
    +      return allocator;
    +    }
    +
    +    @Override
    +    public OperatorContext newOperatorContext(PhysicalOperator popConfig) {
    +      return mockOperatorContext;
    +    }
    +
    +    @Override
    +    public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats) {
    +      return mockOperatorContext;
    +    }
    +
    +    @Override
    +    public SchemaPlus getFullRootSchema() {
    +      return null;
    +    }
    +
    +    @Override
    +    public String getQueryUserName() {
    +      return null;
    +    }
    +
    +    @Override
    +    public String getFragIdString() {
    +      return null;
    +    }
    +
    +    @Override
    +    public CodeCompiler getCompiler() {
            return compiler;
         }
     
         @Override
         protected BufferManager getBufferManager() {
           return bufferManager;
         }
    +
    +    @Override
    +    public void close() {
    +      bufferManager.close();
    +    }
    +
    +    @Override
    +    public ContextInformation getContextInformation() {
    +      return null;
    +    }
    +
    +    @Override
    +    public PartitionExplorer getPartitionExplorer() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ValueHolder getConstantValueHolder(String value, TypeProtos.MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) {
    +      return null;
    +    }
    +  }
    +
    +  public static class MockExecutorFragmentContext extends MockFragmentContext implements ExecutorFragmentContext {
    +
    +    public MockExecutorFragmentContext(DrillConfig config, OptionManager optionManager, BufferAllocator allocator) {
    +      super(config, optionManager, allocator);
    +    }
    +
    +    @Override
    +    public PhysicalPlanReader getPlanReader() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public ClusterCoordinator getClusterCoordinator() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public CoordinationProtos.DrillbitEndpoint getForemanEndpoint() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public CoordinationProtos.DrillbitEndpoint getEndpoint() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public Collection<CoordinationProtos.DrillbitEndpoint> getBits() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public OperatorCreatorRegistry getOperatorCreatorRegistry() {
    +      return null;
    +    }
    +
    +    @Override
    +    public void setBuffers(IncomingBuffers buffers) {
    +
    +    }
    +
    +    @Override
    +    public Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections() {
    +      return null;
    +    }
    +
    +    @Override
    +    public QueryProfileStoreContext getProfileStoreContext() {
    +      return null;
    +    }
    +
    +    @Override
    +    public void waitForSendComplete() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public AccountingDataTunnel getDataTunnel(CoordinationProtos.DrillbitEndpoint endpoint) {
    --- End diff --
    
    We really don't want to expose this network-related stuff in most fragment contexts. The idea for the API I created was to separate out the operator-related stuff from the Drillbit/network stuff. Doing so minimizes coupling and mocking. Shame to see us going back the other direction...


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162244374
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---
    @@ -118,7 +118,7 @@ public IterOutcome innerNext() {
         } catch(IOException ex) {
           logger.error("Failure during query", ex);
           kill(false);
    -      context.fail(ex);
    +      context.getExecutorState().fail(ex);
    --- End diff --
    
    At some point, you may want to lead a cleanup of Drill's failure reporting design. I took a crack a while back. We have multiple ways of reporting errors:
    
    * Throw a UserException explaining the error
    * Throw an unchecked exception and and let the fragment executor guess the cause.
    * Return STOP
    * Tell the fragment executor to fail. (A we also required to return STOP?)
    * Return OUT_OF_MEMORY status
    
    The proposal is to replace them all with a single solution: throw a UserException. Each operator catches other exceptions and translates them to UserException.
    
    Java unwinds the stack just fine; no reason for us to write code to do it via STOP.
    
    Then, the Fragment Executor calls close() on all operators. No reason to try to do this cleanup on STOP. (Even if we do, the lower-level operators won't have seen the STOP.)
    
    Since failures are hard to test, and have cause no end of problems, having multiple ways to do the same thing is really not that helpful to Drill users.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162246947
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -84,28 +82,17 @@ protected void testSqlPlan(String sqlCommands) throws Exception {
         systemOptions.init();
         @SuppressWarnings("resource")
         final UserSession userSession = UserSession.Builder.newBuilder().withOptionManager(systemOptions).build();
    -    final SessionOptionManager sessionOptions = (SessionOptionManager) userSession.getOptions();
    +    final SessionOptionManager sessionOptions = userSession.getOptions();
         final QueryOptionManager queryOptions = new QueryOptionManager(sessionOptions);
         final ExecutionControls executionControls = new ExecutionControls(queryOptions, DrillbitEndpoint.getDefaultInstance());
     
    -    new NonStrictExpectations() {
    -      {
    -        dbContext.getMetrics();
    -        result = new MetricRegistry();
    -        dbContext.getAllocator();
    -        result = allocator;
    -        dbContext.getConfig();
    -        result = config;
    -        dbContext.getOptionManager();
    -        result = systemOptions;
    -        dbContext.getStoreProvider();
    -        result = provider;
    -        dbContext.getClasspathScan();
    -        result = scanResult;
    -        dbContext.getLpPersistence();
    -        result = logicalPlanPersistence;
    -      }
    -    };
    +    when(dbContext.getMetrics()).thenReturn(new MetricRegistry());
    +    when(dbContext.getAllocator()).thenReturn(allocator);
    +    when(dbContext.getConfig()).thenReturn(config);
    +    when(dbContext.getOptionManager()).thenReturn(systemOptions);
    +    when(dbContext.getStoreProvider()).thenReturn(provider);
    +    when(dbContext.getClasspathScan()).thenReturn(scanResult);
    +    when(dbContext.getLpPersistence()).thenReturn(logicalPlanPersistence);
    --- End diff --
    
    Here, we should have a test time drillbit context implementation that extends a common interface -- just as we are now doing for the fragment context.
    
    Why? We want classes to have well-defined interfaces, and want to minimize cohesion. (Basic OO design rules.) It should be made very clear when some planning test needs additional info. And, if it needs that info, then the set of info should be varied to ensure all code paths are tested. That can't be done if we use one generic set of global state for all tests.
    
    This gets to philosophy also. Our tests should strive to cover all possible states. that Can only be done at the unit level. And, it can only be done if we drive the tests with a wide variety of inputs.
    
    I we bolkt things together, then tests, we must run a minimum of tests: just enough to show that "things work" for one or two cases. We then rely on the users for more complex test cases. That is great -- but tends to upset the users...


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162496772
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -86,37 +117,35 @@
      * <li>Multiple threads of execution.</li>
      * </ul>
      */
    -
     public class OperatorFixture extends BaseFixture implements AutoCloseable {
     
    +  public OperatorContext operatorContext(PhysicalOperator config) {
    +    return new MockOperatorContext(context, allocator(), config);
    +  }
    +
       /**
        * Builds an operator fixture based on a set of config options and system/session
        * options.
        */
    -
    -  public static class OperatorFixtureBuilder
    +  public static class Builder
    --- End diff --
    
    There are no collisions. Java has a nice syntax for handling names like this. Specifically you can use this name for the class **OperatorFixture.Builder** in all your variable declarations. This removes the redundancy of prefixing the name of an inner class with the name of the outer class. 


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    @paul-rogers have fixed merge conflicts. This is ready for review again.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162497247
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -86,37 +117,35 @@
      * <li>Multiple threads of execution.</li>
      * </ul>
      */
    -
     public class OperatorFixture extends BaseFixture implements AutoCloseable {
     
    +  public OperatorContext operatorContext(PhysicalOperator config) {
    +    return new MockOperatorContext(context, allocator(), config);
    +  }
    +
       /**
        * Builds an operator fixture based on a set of config options and system/session
        * options.
        */
    -
    -  public static class OperatorFixtureBuilder
    +  public static class Builder
    --- End diff --
    
    Yes, of course. But one of the changes was to pull this out to a new top-level class. In that case, "Builder" by itself is rather generic.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162513475
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---
    @@ -118,7 +118,7 @@ public IterOutcome innerNext() {
         } catch(IOException ex) {
           logger.error("Failure during query", ex);
           kill(false);
    -      context.fail(ex);
    +      context.getExecutorState().fail(ex);
    --- End diff --
    
    Sounds fun. Created https://issues.apache.org/jira/browse/DRILL-6098


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156460993
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -161,16 +163,13 @@ public void close() {
         }
         try {
           if (fs != null) {
    -        fs.close();
    -        fs = null;
    +          fs.close();
    --- End diff --
    
    fixed


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156181112
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -39,27 +39,29 @@
      * version of the operator context and the full production-time context
      * that includes network services.
      */
    -
     public abstract class BaseOperatorContext implements OperatorContext {
       static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
     
    -  protected final FragmentContextInterface context;
    +  protected final FragmentContext context;
    --- End diff --
    
    You are a brave man. I elected not to do this renaming because I didn't think I'd get anyone to review it. Thanks for doing it so I can do the review :-)


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162511432
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -84,28 +82,17 @@ protected void testSqlPlan(String sqlCommands) throws Exception {
         systemOptions.init();
         @SuppressWarnings("resource")
         final UserSession userSession = UserSession.Builder.newBuilder().withOptionManager(systemOptions).build();
    -    final SessionOptionManager sessionOptions = (SessionOptionManager) userSession.getOptions();
    +    final SessionOptionManager sessionOptions = userSession.getOptions();
         final QueryOptionManager queryOptions = new QueryOptionManager(sessionOptions);
         final ExecutionControls executionControls = new ExecutionControls(queryOptions, DrillbitEndpoint.getDefaultInstance());
     
    -    new NonStrictExpectations() {
    -      {
    -        dbContext.getMetrics();
    -        result = new MetricRegistry();
    -        dbContext.getAllocator();
    -        result = allocator;
    -        dbContext.getConfig();
    -        result = config;
    -        dbContext.getOptionManager();
    -        result = systemOptions;
    -        dbContext.getStoreProvider();
    -        result = provider;
    -        dbContext.getClasspathScan();
    -        result = scanResult;
    -        dbContext.getLpPersistence();
    -        result = logicalPlanPersistence;
    -      }
    -    };
    +    when(dbContext.getMetrics()).thenReturn(new MetricRegistry());
    +    when(dbContext.getAllocator()).thenReturn(allocator);
    +    when(dbContext.getConfig()).thenReturn(config);
    +    when(dbContext.getOptionManager()).thenReturn(systemOptions);
    +    when(dbContext.getStoreProvider()).thenReturn(provider);
    +    when(dbContext.getClasspathScan()).thenReturn(scanResult);
    +    when(dbContext.getLpPersistence()).thenReturn(logicalPlanPersistence);
    --- End diff --
    
    I completely agree with this in the long term. Since this code deals with the QueryContext I did not dive into creating an interface for the QueryContext and a Mock implementation of the interface. I want to limit the scope of this PR since it is already quite large, but I have created a follow up ticket to handle the mocking of the QueryContext correctly https://issues.apache.org/jira/browse/DRILL-6097 . As an additional note, the only reason why this code was changed was to take a step towards removing JMockit which does not play nice with Eclipse as you have noticed. Removing JMockit and replacing it with Mockito was the easiest thing to do at this point. Once this change goes in we can revisit the QueryContext in DRILL-6097.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162518120
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---
    @@ -58,23 +55,24 @@
     import com.google.common.collect.ImmutableList;
     import com.google.common.io.Resources;
     
    -public class PlanningBase extends ExecTest{
    -  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    --- End diff --
    
    Thanks!


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156534273
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
    --- End diff --
    
    Changed again moved to ExecutorFragmentContext


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156191431
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
    --- End diff --
    
    The goal of the interface was to define a self-contained set of services easily used in unit tests. Said another way, code that uses the interface controlled dependencies and can be unit tested without mocks.
    
    Introducing these network-related APIs breaks that assumption. Now, any code can access network services, creating a huge API and requiring extensive mocking.
    
    The suggestion here is to either:
    
    1. Keep network (or server) related services only in the "impl" class (the design as of the commit that introduced this interface.)
    
    2. Create a new "NetworkFragmentContext" that has the isolated service plus the network and server-related services.
    
    In either case, code that uses the original interface can be easily unit tested. Code that uses the "network/server" flavor requires the full Drillbit or extensive mocking.
    
    In general, it will only be exchange senders and receivers that need network services. All "internal" operators should be able to use the constrained version.
    
    (Would sure be great if Drill were a project that valued design documents so all of this would not just be personal opinion or word of mouth...)
    
    Doing the full review (see notes below), a better design might be:
    
    1. Runtime fragment context (used by internal operators, makes it easy to unit test.)
    2. Exchange fragment context (exposes network services, used only for exchange send and receive. Quite hard to unit test.)
    3. Fragment builder context (used to create a fragment, but never used once the fragment is up and running.)
    
    This three-part split will relieve the kitchen-sink design of the current classes while preserving the goal of the prior commit allow easy unit testing of internal (non-exchange) operators.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192269
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    --- End diff --
    
    Network related.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156186386
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---
    @@ -39,27 +39,29 @@
      * version of the operator context and the full production-time context
      * that includes network services.
      */
    -
     public abstract class BaseOperatorContext implements OperatorContext {
       static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class);
     
    -  protected final FragmentContextInterface context;
    +  protected final FragmentContext context;
       protected final BufferAllocator allocator;
       protected final PhysicalOperator popConfig;
       protected final BufferManager manager;
    +  protected OperatorStatReceiver statsWriter;
       private DrillFileSystem fs;
       private ControlsInjector injector;
     
    -  public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator,
    -               PhysicalOperator popConfig) {
    +  public BaseOperatorContext(FragmentContext context, BufferAllocator allocator,
    +                             PhysicalOperator popConfig,
    +                             OperatorStatReceiver stats) {
         this.context = context;
         this.allocator = allocator;
         this.popConfig = popConfig;
         this.manager = new BufferManagerImpl(allocator);
    +    this.statsWriter = stats;
    --- End diff --
    
    This member was removed in a recent commit. Since the test and production versions need different implementation, the derived classes manage the operator stats.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162495937
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -127,20 +156,26 @@ public OperatorFixture build() {
        * uses the same code generation mechanism as the full Drill, but
        * provide test-specific versions of various other services.
        */
    -
    -  public static class TestFragmentContext extends BaseFragmentContext {
    -
    +  public static class MockFragmentContext extends BaseFragmentContext {
    --- End diff --
    
    I like prefixing Mock classes with **Mock** instead of **Test**.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156245196
  
    --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java ---
    @@ -66,13 +66,12 @@
       public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
           FragmentContext context, KafkaStoragePlugin plugin) {
         setColumns(projectedColumns);
    -    this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    -    this.readNumbersAsDouble = context.getOptions()
    -        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    -    OptionManager options = context.getOptions();
    -    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
    -    this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    -    this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    +    final OptionSet optionSet = context.getOptionSet();
    +    this.enableAllTextMode = optionSet.getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
    +    this.readNumbersAsDouble = optionSet.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
    +    this.unionEnabled = optionSet.getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    this.kafkaMsgReader = optionSet.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
    +    this.kafkaPollTimeOut = optionSet.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
    --- End diff --
    
    done


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156179325
  
    --- Diff: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---
    @@ -117,7 +116,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
     
         disableCountOptimization = formatPluginConfig.disableCountOptimization();
         setColumns(projectedColumns);
    -    unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    unionEnabled = context.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    Kind of funny that you have converted all this code to use `OptionSet`. I was in the process of deprecating that class since the `OptionManager` seemed to work in tests. (The previous dependencies on the Drill server had been removed.)
    
    But, since you've done all this work, we might as well embrace `OptionSet` as the read-only view of options used by fragments and operators. `OptionManager` is used only by code that alters options.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162499201
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java ---
    @@ -21,18 +21,14 @@
     
     import org.apache.drill.exec.ExecTest;
     import org.apache.drill.exec.proto.CoordinationProtos;
    -import org.apache.drill.exec.store.parquet.ParquetGroupScan;
     import org.apache.hadoop.fs.BlockLocation;
     import org.junit.Test;
     
     import com.google.common.collect.ImmutableRangeMap;
     import com.google.common.collect.Range;
     
     public class TestAffinityCalculator extends ExecTest {
    -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAffinityCalculator.class);
    -
    -  String port = "1234";
    -  final String path = "path";
    +  private final String port = "1234";
    --- End diff --
    
     - Two tests were blatantly commented out.
       - testSetEndpointBytes
       - testApplyAssignments
     - Several methods were unused according to IntelliJ and the java compiler.
       - buildRowGroups
       - buildEndpoints
       - buildBlockLocations2
    
    After removing the unused code there is shockingly almost nothing left. There are basically no functioning unit tests for the AffinityCalculator.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162495593
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -175,19 +210,189 @@ public DrillConfig getConfig() {
         }
     
         @Override
    -    public DrillbitContext getDrillbitContext() {
    -      throw new UnsupportedOperationException("Drillbit context not available for operator unit tests");
    +    public ExecutorService getScanDecodeExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorService getScanExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorService getExecutor() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ExecutorState getExecutorState() {
    +      return executorState;
    +    }
    +
    +    @Override
    +    public BufferAllocator getNewChildAllocator(String operatorName, int operatorId,
    +                                                long initialReservation, long maximumReservation) {
    +      return allocator.newChildAllocator(
    +        "op:" + operatorId + ":" + operatorName,
    +        initialReservation,
    +        maximumReservation);
         }
     
         @Override
    -    protected CodeCompiler getCompiler() {
    +    public ExecProtos.FragmentHandle getHandle() {
    +      return ExecProtos.FragmentHandle.newBuilder().build();
    +    }
    +
    +    @Override
    +    public BufferAllocator getAllocator() {
    +      return allocator;
    +    }
    +
    +    @Override
    +    public OperatorContext newOperatorContext(PhysicalOperator popConfig) {
    +      return mockOperatorContext;
    +    }
    +
    +    @Override
    +    public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats) {
    +      return mockOperatorContext;
    +    }
    +
    +    @Override
    +    public SchemaPlus getFullRootSchema() {
    +      return null;
    +    }
    +
    +    @Override
    +    public String getQueryUserName() {
    +      return null;
    +    }
    +
    +    @Override
    +    public String getFragIdString() {
    +      return null;
    +    }
    +
    +    @Override
    +    public CodeCompiler getCompiler() {
            return compiler;
         }
     
         @Override
         protected BufferManager getBufferManager() {
           return bufferManager;
         }
    +
    +    @Override
    +    public void close() {
    +      bufferManager.close();
    +    }
    +
    +    @Override
    +    public ContextInformation getContextInformation() {
    +      return null;
    +    }
    +
    +    @Override
    +    public PartitionExplorer getPartitionExplorer() {
    +      return null;
    +    }
    +
    +    @Override
    +    public ValueHolder getConstantValueHolder(String value, TypeProtos.MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) {
    +      return null;
    +    }
    +  }
    +
    +  public static class MockExecutorFragmentContext extends MockFragmentContext implements ExecutorFragmentContext {
    +
    +    public MockExecutorFragmentContext(DrillConfig config, OptionManager optionManager, BufferAllocator allocator) {
    +      super(config, optionManager, allocator);
    +    }
    +
    +    @Override
    +    public PhysicalPlanReader getPlanReader() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public ClusterCoordinator getClusterCoordinator() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public CoordinationProtos.DrillbitEndpoint getForemanEndpoint() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public CoordinationProtos.DrillbitEndpoint getEndpoint() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public Collection<CoordinationProtos.DrillbitEndpoint> getBits() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public OperatorCreatorRegistry getOperatorCreatorRegistry() {
    +      return null;
    +    }
    +
    +    @Override
    +    public void setBuffers(IncomingBuffers buffers) {
    +
    +    }
    +
    +    @Override
    +    public Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections() {
    +      return null;
    +    }
    +
    +    @Override
    +    public QueryProfileStoreContext getProfileStoreContext() {
    +      return null;
    +    }
    +
    +    @Override
    +    public void waitForSendComplete() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public AccountingDataTunnel getDataTunnel(CoordinationProtos.DrillbitEndpoint endpoint) {
    --- End diff --
    
    Agreed. This is actually not exposed in the FragmentContext used by operators, it is only exposed in the **ExecutorFragmentContext** which is used by the FragmentExecutor and passed to the BatchCreators. Unfortunately we still need a simple mock of the ExecutorFragmentContext because batchCreators are used in PhysicalOpUnitTestBase. Furthermore the networking methods aren't even used in that test so we don't even have to provide functional mocks of them for that one test. Since PhysicalOpUnitTestBase is the only test that requires an ExecutorFragmentContext I will move this mock class into PhysicalOpUnitTestBase and will add a comment that this Mock class should not be used outside of PhysicalOpUnitTestBase. All the other tests are able to get away with FragmentContext, which does not have any networking methods. And eventually we should remove PhysicalOpUnitTestBase in favor of the Fixture classes you added, and we will then be able to delete this MockClass entirely.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156243457
  
    --- Diff: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---
    @@ -117,7 +116,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
     
         disableCountOptimization = formatPluginConfig.disableCountOptimization();
         setColumns(projectedColumns);
    -    unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
    +    unionEnabled = context.getOptionSet().getOption(ExecConstants.ENABLE_UNION_TYPE);
    --- End diff --
    
    Sounds good


---

[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1045
  
    Rebased and resolved conflicts @Ben-Zvi please take a look.


---

[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r162521845
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---
    @@ -86,37 +117,35 @@
      * <li>Multiple threads of execution.</li>
      * </ul>
      */
    -
     public class OperatorFixture extends BaseFixture implements AutoCloseable {
     
    +  public OperatorContext operatorContext(PhysicalOperator config) {
    +    return new MockOperatorContext(context, allocator(), config);
    +  }
    +
       /**
        * Builds an operator fixture based on a set of config options and system/session
        * options.
        */
    -
    -  public static class OperatorFixtureBuilder
    +  public static class Builder
    --- End diff --
    
    It's a nested class in the master branch currently https://github.com/apache/drill/blob/master/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java . So I didn't change that in this PR. Did you want me to pull it out into a top level class as part of this PR?


---