You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by paul-rogers <gi...@git.apache.org> on 2018/02/13 06:30:33 UTC

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

GitHub user paul-rogers opened a pull request:

    https://github.com/apache/drill/pull/1121

    DRILL-6153: Operator framework

    Includes the core files for the operator framework revision. See [this writeup](https://github.com/paul-rogers/drill/wiki/BH-Operator-Framework) for details.
    
    In this commit, nothing depends on this new code. It is, instead, the foundation for the revised scan operator to be added after the revised result set loader code is committed. Doing this commit now allows small PRs to be done in parallel.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/paul-rogers/drill DRILL-6153

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1121
    
----
commit 0e52bf4fc7bacf41565ca9a1055219bce3c279fe
Author: Paul Rogers <pr...@...>
Date:   2018-02-13T06:27:23Z

    DRILL-6153: Operator framework

----


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169203509
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
    +    this.opContext = opContext;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    --- End diff --
    
    Added a Cancelled state. But, nothing ever reads that state. The point of FAILED is just to avoid confusion when calling `next()` after a failure or cancellation.
    
    We definitely *do not* want cancellation to move to the `CLOSED` state. This is a bug that exists in several operators. If `cancel()` closes the operator, then the operator is closed twice: once when the downstream operator says it wants no more rows, and a second time when the fragment executor does the real close.
    
    So, the `FAILED` state marks that a) we won't return any more rows, but that b) `close()` has not been called yet. The new `CANCELED` state has the same semantics.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169162527
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import java.util.Collections;
    +import java.util.Iterator;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +public class VectorContainerAccessor implements BatchAccessor {
    +
    +  public static class ContainerAndSv2Accessor extends VectorContainerAccessor {
    +
    +    private SelectionVector2 sv2;
    +
    +    public void setSelectionVector(SelectionVector2 sv2) {
    +      this.sv2 = sv2;
    +    }
    +
    +    @Override
    +    public SelectionVector2 getSelectionVector2() {
    +      return sv2;
    +    }
    +  }
    +
    +  public static class ContainerAndSv4Accessor extends VectorContainerAccessor {
    +
    +    private SelectionVector4 sv4;
    +
    +    @Override
    +    public SelectionVector4 getSelectionVector4() {
    +      return sv4;
    +    }
    +  }
    +
    +  private VectorContainer container;
    +  private SchemaTracker schemaTracker = new SchemaTracker();
    +
    +  /**
    +   * Set the vector container. Done initially, and any time the schema of
    +   * the container may have changed. May be called with the same container
    +   * as the previous call, or a different one. A schema change occurs
    +   * unless the vectors are identical across the two containers.
    +   *
    +   * @param container the container that holds vectors to be sent
    +   * downstream
    +   */
    +
    +  public void setContainer(VectorContainer container) {
    +    this.container = container;
    +    if (container != null) {
    +      schemaTracker.trackSchema(container);
    +    }
    +  }
    +
    +  @Override
    +  public BatchSchema getSchema() {
    +    return container == null ? null : container.getSchema();
    +  }
    +
    +  @Override
    +  public int schemaVersion() { return schemaTracker.schemaVersion(); }
    +
    +  @Override
    +  public int getRowCount() {
    +    return container == null ? 0 : container.getRecordCount();
    +  }
    +
    +  @Override
    +  public VectorContainer getOutgoingContainer() { return container; }
    +
    +  @Override
    +  public TypedFieldId getValueVectorId(SchemaPath path) {
    +    return container.getValueVectorId(path);
    +  }
    +
    +  @Override
    +  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
    +    return container.getValueAccessorById(clazz, ids);
    +  }
    +
    +  @Override
    +  public WritableBatch getWritableBatch() {
    +    return WritableBatch.get(container);
    +  }
    +
    +  @Override
    +  public SelectionVector2 getSelectionVector2() {
    +    // Throws an exception by default
    --- End diff --
    
    should we make that explicit by indicating what exceptions it might throw. 


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169203498
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
    +    this.opContext = opContext;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    --- End diff --
    
    Fixed.


---

[GitHub] drill issue #1121: DRILL-6153: Operator framework

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/drill/pull/1121
  
    Hi @paul-rogers, nice to see this code being encapsulated, standardized, and unit tested. There seems to be a Travis test failure with your changes though.
    
    ```
    Failed tests: 
      TestOperatorRecordBatch.testNormalLifeCycle:156 expected null, but was:<or...@45f1feca>
    ```
    



---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168677109
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.exec.ops.OperatorContext;
    +
    +/**
    + * Core protocol for a Drill operator execution.
    + *
    + * <h4>Lifecycle</h4>
    + *
    + * <ul>
    + * <li>Creation via an operator-specific constructor in the
    + * corresponding <tt>RecordBatchCreator</tt>.</li>
    + * <li><tt>bind()</tt> called to provide the operator services.</li>
    + * <li><tt>buildSchema()</tt> called to define the schema before
    + * fetching the first record batch.</li>
    + * <li><tt>next()</tt> called repeatedly to prepare each new record
    + * batch until EOF or until cancellation.</li>
    + * <li><tt>cancel()</tt> called if the operator should quit early.</li>
    + * <li><tt>close()</tt> called to release resources. Note that
    + * <tt>close()</tt> is called in response to:<ul>
    + *   <li>EOF</li>
    + *   <li>After <tt>cancel()</tt></li>
    + *   <li>After an exception is thrown.</li></ul></li>
    + * </ul>
    + *
    + * <h4>Error Handling</h4>
    + *
    + * Any method can throw an (unchecked) exception. (Drill does not use
    + * checked exceptions.) Preferably, the code will throw a
    + * <tt>UserException</tt> that explains the error to the user. If any
    + * other kind of exception is thrown, then the enclosing class wraps it
    + * in a generic <tt>UserException</tt> that indicates that "something went
    + * wrong", which is less than ideal.
    + *
    + * <h4>Result Set</h4>
    + * The operator "publishes" a result set in response to returning
    + * <tt>true</tt> from <tt>next()</tt> by populating a
    + * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
    + * compatibility with other Drill operators, the set of vectors within
    + * the batch must be the same from one batch to the next.
    + */
    +
    +public interface OperatorExec {
    +
    +  /**
    +   * Bind this operator to the context. The context provides access
    +   * to per-operator, per-fragment and per-Drillbit services.
    +   * Also provides access to the operator definition (AKA "pop
    +   * config") for this operator.
    +   *
    +   * @param context operator context
    +   */
    +
    +  public void bind(OperatorContext context);
    +
    +  /**
    +   * Provides a generic access mechanism to the batch's output data.
    +   * This method is called after a successful return from
    +   * {@link #buildSchema()} and {@link #next()}. The batch itself
    +   * can be held in a standard {@link VectorContainer}, or in some
    +   * other structure more convenient for this operator.
    +   *
    +   * @return the access for the batch's output container
    +   */
    +
    +  BatchAccessor batchAccessor();
    +
    +  /**
    +   * Retrieves the schema of the batch before the first actual batch
    +   * of data. The schema is returned via an empty batch (no rows,
    +   * only schema) from {@link #batchAccessor()}.
    +   *
    +   * @return true if a schema is available, false if the operator
    +   * reached EOF before a schema was found
    +   */
    +
    +  boolean buildSchema();
    +
    +  /**
    +   * Retrieves the next batch of data. The data is returned via
    +   * the {@link #batchAccessor()} method.
    +   *
    +   * @return true if another batch of data is available, false if
    +   * EOF was reached and no more data is available
    +   */
    +
    +  boolean next();
    +
    +  /**
    +   * Alerts the operator that the query was cancelled. Generally
    +   * optional, but allows the operator to realize that a cancellation
    --- End diff --
    
    An operator will work if it ignores `cancel()`. It's `close()` method should "do the right thing":
    
    * If the operator reached EOF
    * If the operator did not reach EOF (still rows left)
    * If the operator failed
    
    The `cancel()` call is just a hint that "I won't be reading any more rows; feel free to release resources now if you like."


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169155210
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
    +    this.opContext = opContext;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    --- End diff --
    
    alignment


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168675063
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    --- End diff --
    
    Next, the question about the `END` and `FAILED` states. `END` means we reached EOF and have no data left to deliver. Calling `next()` in the `END` state simply returns `DONE` (there is *still* no more data.)
    
    On the other hand, `FAILED` indicates that an error occurred. This seemed like a useful thing to know. But, as I review this version of the code, I see we are not actually using this state. So, rather than add a `CANCELLED` state, maybe we can collapse `FAILED` into `END`: which is just a signal to `next()` to return `DONE`.
    
    Cleanup should be no different in the three cases: we must release all resources regardless of the reason that `close()` is called.
    
    Thoughts?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168317696
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    --- End diff --
    
    typo ? opServices instead of opServicees ?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168324742
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    +    this.opContext = opServicees;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    +    }
    +  }
    +
    + /**
    +   * Start the operator executor. Bind it to the various contexts.
    +   * Then start the executor and fetch the first schema.
    +   * @return result of the first batch, which should contain
    +   * only a schema, or EOF
    +   */
    +
    +  private IterOutcome start() {
    +    state = State.SCHEMA;
    +    if (operatorExec.buildSchema()) {
    +      schemaVersion = batchAccessor.schemaVersion();
    +      state = State.RUN;
    +      return IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      state = State.END;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Fetch a record batch, detecting EOF and a new schema.
    +   * @return the <tt>IterOutcome</tt> for the above cases
    +   */
    +
    +  private IterOutcome doNext() {
    +    if (! operatorExec.next()) {
    +      state = State.END;
    +      return IterOutcome.NONE;
    +    }
    +    int newVersion = batchAccessor.schemaVersion();
    +    if (newVersion != schemaVersion) {
    +      schemaVersion = newVersion;
    +      return IterOutcome.OK_NEW_SCHEMA;
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Implement a cancellation, and ignore any exception that is
    +   * thrown. We're already in trouble here, no need to keep track
    +   * of additional things that go wrong.
    +   */
    +
    +  private void cancelSilently() {
    +    try {
    +      if (state == State.SCHEMA || state == State.RUN) {
    +        operatorExec.cancel();
    +      }
    +    } catch (Throwable t) {
    +      // Ignore; we're already in a bad state.
    +      OperatorRecordBatch.logger.error("Exception thrown from cancel() for " + operatorLabel(), t);
    +    }
    +  }
    +
    +  private String operatorLabel() {
    +    return operatorExec.getClass().getCanonicalName();
    +  }
    +
    +  public void close() {
    +    if (state == State.CLOSED) {
    --- End diff --
    
    should we check for  FAILED and END state also here ?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169159418
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
    +    this.opContext = opContext;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    --- End diff --
    
    I am thinking FAILED represents internal failure with in the operator.  Cancel means we are explicitly canceling it (for whatever reasons) i.e. operator is being asked to shutdown or close.  For cancel, should we move the state to CLOSED instead of FAILED. 


---

[GitHub] drill issue #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/1121
  
    @arina-ielchiieva, can you do a committer review of this one? 


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169203884
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import java.util.Collections;
    +import java.util.Iterator;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +public class VectorContainerAccessor implements BatchAccessor {
    +
    +  public static class ContainerAndSv2Accessor extends VectorContainerAccessor {
    +
    +    private SelectionVector2 sv2;
    +
    +    public void setSelectionVector(SelectionVector2 sv2) {
    +      this.sv2 = sv2;
    +    }
    +
    +    @Override
    +    public SelectionVector2 getSelectionVector2() {
    +      return sv2;
    +    }
    +  }
    +
    +  public static class ContainerAndSv4Accessor extends VectorContainerAccessor {
    +
    +    private SelectionVector4 sv4;
    +
    +    @Override
    +    public SelectionVector4 getSelectionVector4() {
    +      return sv4;
    +    }
    +  }
    +
    +  private VectorContainer container;
    +  private SchemaTracker schemaTracker = new SchemaTracker();
    +
    +  /**
    +   * Set the vector container. Done initially, and any time the schema of
    +   * the container may have changed. May be called with the same container
    +   * as the previous call, or a different one. A schema change occurs
    +   * unless the vectors are identical across the two containers.
    +   *
    +   * @param container the container that holds vectors to be sent
    +   * downstream
    +   */
    +
    +  public void setContainer(VectorContainer container) {
    +    this.container = container;
    +    if (container != null) {
    +      schemaTracker.trackSchema(container);
    +    }
    +  }
    +
    +  @Override
    +  public BatchSchema getSchema() {
    +    return container == null ? null : container.getSchema();
    +  }
    +
    +  @Override
    +  public int schemaVersion() { return schemaTracker.schemaVersion(); }
    +
    +  @Override
    +  public int getRowCount() {
    +    return container == null ? 0 : container.getRecordCount();
    +  }
    +
    +  @Override
    +  public VectorContainer getOutgoingContainer() { return container; }
    +
    +  @Override
    +  public TypedFieldId getValueVectorId(SchemaPath path) {
    +    return container.getValueVectorId(path);
    +  }
    +
    +  @Override
    +  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
    +    return container.getValueAccessorById(clazz, ids);
    +  }
    +
    +  @Override
    +  public WritableBatch getWritableBatch() {
    +    return WritableBatch.get(container);
    +  }
    +
    +  @Override
    +  public SelectionVector2 getSelectionVector2() {
    +    // Throws an exception by default
    --- End diff --
    
    We could. The reason that it throws an exception is that, if the batch has no selection vector, yet we ask for it, it is an error under Drill semantics. Said another way, the client should ask for a selection vector only if one is available. This is existing behavior; this method simply wraps that existing behavior.
    
    So, the place to document the exception is on the `VectorContainer` method.
    
    Improved the comment a bit.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1121


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169203505
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    --- End diff --
    
    Fixed.


---

[GitHub] drill issue #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/1121
  
    Rebased on latest master.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168331779
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    --- End diff --
    
    Do we need SCHEMA state as you are moving from START to RUN or END or FAILED. may be combine START and SCHEMA states to say GET_SCHEMA or something like that ? Also, would it be good to have two states FAILED and CANCELLED to differentiate whether the query was cancelled or failed due to error.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168325261
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.exec.ops.OperatorContext;
    +
    +/**
    + * Core protocol for a Drill operator execution.
    + *
    + * <h4>Lifecycle</h4>
    + *
    + * <ul>
    + * <li>Creation via an operator-specific constructor in the
    + * corresponding <tt>RecordBatchCreator</tt>.</li>
    + * <li><tt>bind()</tt> called to provide the operator services.</li>
    + * <li><tt>buildSchema()</tt> called to define the schema before
    + * fetching the first record batch.</li>
    + * <li><tt>next()</tt> called repeatedly to prepare each new record
    + * batch until EOF or until cancellation.</li>
    + * <li><tt>cancel()</tt> called if the operator should quit early.</li>
    + * <li><tt>close()</tt> called to release resources. Note that
    + * <tt>close()</tt> is called in response to:<ul>
    + *   <li>EOF</li>
    + *   <li>After <tt>cancel()</tt></li>
    + *   <li>After an exception is thrown.</li></ul></li>
    + * </ul>
    + *
    + * <h4>Error Handling</h4>
    + *
    + * Any method can throw an (unchecked) exception. (Drill does not use
    + * checked exceptions.) Preferably, the code will throw a
    + * <tt>UserException</tt> that explains the error to the user. If any
    + * other kind of exception is thrown, then the enclosing class wraps it
    + * in a generic <tt>UserException</tt> that indicates that "something went
    + * wrong", which is less than ideal.
    + *
    + * <h4>Result Set</h4>
    + * The operator "publishes" a result set in response to returning
    + * <tt>true</tt> from <tt>next()</tt> by populating a
    + * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
    + * compatibility with other Drill operators, the set of vectors within
    + * the batch must be the same from one batch to the next.
    + */
    +
    +public interface OperatorExec {
    +
    +  /**
    +   * Bind this operator to the context. The context provides access
    +   * to per-operator, per-fragment and per-Drillbit services.
    +   * Also provides access to the operator definition (AKA "pop
    +   * config") for this operator.
    +   *
    +   * @param context operator context
    +   */
    +
    +  public void bind(OperatorContext context);
    +
    +  /**
    +   * Provides a generic access mechanism to the batch's output data.
    +   * This method is called after a successful return from
    +   * {@link #buildSchema()} and {@link #next()}. The batch itself
    +   * can be held in a standard {@link VectorContainer}, or in some
    +   * other structure more convenient for this operator.
    +   *
    +   * @return the access for the batch's output container
    +   */
    +
    +  BatchAccessor batchAccessor();
    +
    +  /**
    +   * Retrieves the schema of the batch before the first actual batch
    +   * of data. The schema is returned via an empty batch (no rows,
    +   * only schema) from {@link #batchAccessor()}.
    +   *
    +   * @return true if a schema is available, false if the operator
    +   * reached EOF before a schema was found
    +   */
    +
    +  boolean buildSchema();
    +
    +  /**
    +   * Retrieves the next batch of data. The data is returned via
    +   * the {@link #batchAccessor()} method.
    +   *
    +   * @return true if another batch of data is available, false if
    +   * EOF was reached and no more data is available
    +   */
    +
    +  boolean next();
    +
    +  /**
    +   * Alerts the operator that the query was cancelled. Generally
    +   * optional, but allows the operator to realize that a cancellation
    --- End diff --
    
    why is this optional ?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168676504
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    --- End diff --
    
    Fixed.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168674650
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    --- End diff --
    
    Thanks for the questions. Let's take them one-by-one.
    
    The model here is that an operator follows the "fast schema" pattern:
    
    * The first call to `next()` produces an empty batch with only the schema.
    * The second call to `next()` returns the first data batch.
    
    The states help:
    
    * `START`: The stage in which the operator has been created, but before the first call to `next()`. When `next()` is called in the `START` state, return just the schema.
    * `SCHEMA`: The schema only has been returned. On the next call to `next()` return the data (if any) associated with the first batch.
    * `RUN`: Normal state for the second and subsequent `next()` calls.
    
    Now, do we need "fast schema"? Maybe not. I *thought* that Drill was designed to return the schema quickly to the client before waiting for the first data batch. But, in subsequent testing, I discovered that few queries actually worked that way. (Some tests count the returned batches and asserted that there should have been only 1: with both data and schema...)
    
    So, if we want "fast schema" then we need the three states. But, if we want the original behavior, then we can, in fact, remove the `SCHEMA` state.
    
    Was there a reason for the "fast schema" path? Or, was that just a vestige of a never-completed feature?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168325765
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import java.util.Iterator;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.FragmentContextInterface;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.CloseableRecordBatch;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +/**
    + * Modular implementation of the standard Drill record batch iterator
    + * protocol. The protocol has two parts: control of the operator and
    + * access to the record batch. Each is encapsulated in separate
    + * implementation classes to allow easier customization for each
    + * situation. The operator internals are, themselves, abstracted to
    + * yet another class with the steps represented as method calls rather
    + * than as internal states as in the record batch iterator protocol.
    + * <p>
    + * Note that downstream operators make an assumption that the
    + * same vectors will appear from one batch to the next. That is,
    + * not only must the schema be the same, but if column "a" appears
    + * in two batches, the same value vector must back "a" in both
    + * batches. The <tt>TransferPair</tt> abstraction fails if different
    + * vectors appear across batches.
    + */
    +
    +public class OperatorRecordBatch implements CloseableRecordBatch {
    +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
    +
    +  private final OperatorDriver driver;
    +  private final BatchAccessor batchAccessor;
    +
    +  public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
    +    OperatorContext opContext = context.newOperatorContext(config);
    +    opContext.getStats().startProcessing();
    +
    +    // Chicken-and-egg binding: the two objects must know about each other. Pass the
    +    // context to the operator exec via a bind method.
    +
    +    try {
    +      opExec.bind(opContext);
    +      driver = new OperatorDriver(opContext, opExec);
    +      batchAccessor = opExec.batchAccessor();
    +    } catch (UserException e) {
    +      opContext.close();
    +      throw e;
    +    } catch (Throwable t) {
    +      opContext.close();
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", opExec.getClass().getSimpleName() + ".bind()")
    +        .build(logger);
    +    }
    +    finally {
    +      opContext.getStats().stopProcessing();
    +    }
    +  }
    +
    +  @Override
    +  public FragmentContext getContext() {
    +
    +    // Backward compatibility with the full server context. Awkward for testing
    +
    +    FragmentContext fragmentContext = fragmentContext();
    +    if (fragmentContext instanceof FragmentContext) {
    --- End diff --
    
    not clear what we are doing here. why can't we just return fragmentContext ?


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168337843
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    +    this.opContext = opServicees;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    +    }
    +  }
    +
    + /**
    +   * Start the operator executor. Bind it to the various contexts.
    +   * Then start the executor and fetch the first schema.
    +   * @return result of the first batch, which should contain
    +   * only a schema, or EOF
    +   */
    +
    +  private IterOutcome start() {
    --- End diff --
    
    would be good if we can capture what exceptions each of these functions might throw. 


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168676813
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    +    this.opContext = opServicees;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    +    }
    +  }
    +
    + /**
    +   * Start the operator executor. Bind it to the various contexts.
    +   * Then start the executor and fetch the first schema.
    +   * @return result of the first batch, which should contain
    +   * only a schema, or EOF
    +   */
    +
    +  private IterOutcome start() {
    --- End diff --
    
    Somewhere I explained the policy assumed here:
    
    * Implementations of the `OperatorExec` interface are responsible for error handling.
    * Implementations should catch exceptions then translate them to a `UserException` with user-oriented information.
    * If implementations do the above, this layer simply passes along the `UserException`.
    * Otherwise, this class acts as a translation layer to convert generic unchecked exceptions into `UserException`s. This layer can't provide much context; but it can do a slightly better job than the fragment exec, which is the last line of defense.
    
    As a result, this layer always throws `UserException`s.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r168676906
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
    +
    +  private OperatorDriver.State state = State.START;
    +
    +  /**
    +   * Operator context. The driver "owns" the context and is responsible
    +   * for closing it.
    +   */
    +
    +  private final OperatorContext opContext;
    +  private final OperatorExec operatorExec;
    +  private final BatchAccessor batchAccessor;
    +  private int schemaVersion;
    +
    +  public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
    +    this.opContext = opServicees;
    +    this.operatorExec = opExec;
    +    batchAccessor = operatorExec.batchAccessor();
    +  }
    +
    +  /**
    +   * Get the next batch. Performs initialization on the first call.
    +   * @return the iteration outcome to send downstream
    +   */
    +
    +  public IterOutcome next() {
    +    try {
    +      switch (state) {
    +      case START:
    +        return start();
    +      case RUN:
    +        return doNext();
    +       default:
    +        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
    +        return IterOutcome.NONE;
    +      }
    +    } catch (UserException e) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw e;
    +    } catch (Throwable t) {
    +      cancelSilently();
    +      state = State.FAILED;
    +      throw UserException.executionError(t)
    +        .addContext("Exception thrown from", operatorLabel())
    +        .build(OperatorRecordBatch.logger);
    +    }
    +  }
    +
    +  /**
    +   * Cancels the operator before reaching EOF.
    +   */
    +
    +  public void cancel() {
    +    try {
    +      switch (state) {
    +      case START:
    +      case RUN:
    +        cancelSilently();
    +        break;
    +      default:
    +        break;
    +      }
    +    } finally {
    +      state = State.FAILED;
    +    }
    +  }
    +
    + /**
    +   * Start the operator executor. Bind it to the various contexts.
    +   * Then start the executor and fetch the first schema.
    +   * @return result of the first batch, which should contain
    +   * only a schema, or EOF
    +   */
    +
    +  private IterOutcome start() {
    +    state = State.SCHEMA;
    +    if (operatorExec.buildSchema()) {
    +      schemaVersion = batchAccessor.schemaVersion();
    +      state = State.RUN;
    +      return IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      state = State.END;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Fetch a record batch, detecting EOF and a new schema.
    +   * @return the <tt>IterOutcome</tt> for the above cases
    +   */
    +
    +  private IterOutcome doNext() {
    +    if (! operatorExec.next()) {
    +      state = State.END;
    +      return IterOutcome.NONE;
    +    }
    +    int newVersion = batchAccessor.schemaVersion();
    +    if (newVersion != schemaVersion) {
    +      schemaVersion = newVersion;
    +      return IterOutcome.OK_NEW_SCHEMA;
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Implement a cancellation, and ignore any exception that is
    +   * thrown. We're already in trouble here, no need to keep track
    +   * of additional things that go wrong.
    +   */
    +
    +  private void cancelSilently() {
    +    try {
    +      if (state == State.SCHEMA || state == State.RUN) {
    +        operatorExec.cancel();
    +      }
    +    } catch (Throwable t) {
    +      // Ignore; we're already in a bad state.
    +      OperatorRecordBatch.logger.error("Exception thrown from cancel() for " + operatorLabel(), t);
    +    }
    +  }
    +
    +  private String operatorLabel() {
    +    return operatorExec.getClass().getCanonicalName();
    +  }
    +
    +  public void close() {
    +    if (state == State.CLOSED) {
    --- End diff --
    
    No. The goal here is simply to silently catch duplicate calls to `close()`. Whether the operator succeeded or failed, the implementation still must release its resources. But, by catching repeated `close()` calls, the implementation can assume that its own `close()` is called exactly once.


---

[GitHub] drill pull request #1121: DRILL-6153: Operator framework

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1121#discussion_r169155001
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.protocol;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
    +
    +/**
    + * State machine that drives the operator executable. Converts
    + * between the iterator protocol and the operator executable protocol.
    + * Implemented as a separate class in anticipation of eventually
    + * changing the record batch (iterator) protocol.
    + */
    +
    +public class OperatorDriver {
    +  public enum State {
    +
    +    /**
    +     * Before the first call to next().
    +     */
    +
    +    START,
    +
    +    /**
    +     * The first call to next() has been made and schema (only)
    +     * was returned. On the subsequent call to next(), return any
    +     * data that might have accompanied that first batch.
    +     */
    +
    +    SCHEMA,
    +
    +    /**
    +     * The second call to next() has been made and there is more
    +     * data to deliver on subsequent calls.
    +     */
    +
    +    RUN,
    +
    +    /**
    +     * No more data to deliver.
    +     */
    +
    +    END,
    +
    +    /**
    +     * An error occurred. Operation was cancelled.
    +     */
    +
    +    FAILED,
    +
    +    /**
    +     * close() called and resources are released.
    +     */
    +
    +    CLOSED }
    --- End diff --
    
    minor: closing braces in a separate line for better readability.


---