You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/03 18:47:20 UTC

[14/17] drill git commit: DRILL-6153: Operator framework

DRILL-6153: Operator framework

closes #1121


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69a5f3a9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69a5f3a9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69a5f3a9

Branch: refs/heads/master
Commit: 69a5f3a9c4fadafc588a3e325a12b98cbf359ece
Parents: 4ee207b
Author: Paul Rogers <pr...@cloudera.com>
Authored: Mon Feb 12 22:27:23 2018 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Sat Mar 3 19:47:53 2018 +0200

----------------------------------------------------------------------
 .../physical/impl/protocol/BatchAccessor.java   |  50 ++
 .../physical/impl/protocol/OperatorDriver.java  | 234 ++++++
 .../physical/impl/protocol/OperatorExec.java    | 127 ++++
 .../impl/protocol/OperatorRecordBatch.java      | 156 ++++
 .../physical/impl/protocol/SchemaTracker.java   |  98 +++
 .../impl/protocol/VectorContainerAccessor.java  | 134 ++++
 .../physical/impl/protocol/package-info.java    |  29 +
 .../impl/protocol/TestOperatorRecordBatch.java  | 747 +++++++++++++++++++
 8 files changed, 1575 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
new file mode 100644
index 0000000..b22353f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Provides access to the row set (record batch) produced by an
+ * operator. Previously, a record batch <i>was</i> an operator.
+ * In this version, the row set is a service of the operator rather
+ * than being part of the operator.
+ */
+
+public interface BatchAccessor {
+  BatchSchema getSchema();
+  int schemaVersion();
+  int getRowCount();
+  VectorContainer getOutgoingContainer();
+  TypedFieldId getValueVectorId(SchemaPath path);
+  VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
+  WritableBatch getWritableBatch();
+  SelectionVector2 getSelectionVector2();
+  SelectionVector4 getSelectionVector4();
+  Iterator<VectorWrapper<?>> iterator();
+  void release();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
new file mode 100644
index 0000000..9e6190c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+  public enum State {
+
+    /**
+     * Before the first call to next().
+     */
+
+    START,
+
+    /**
+     * The first call to next() has been made and schema (only)
+     * was returned. On the subsequent call to next(), return any
+     * data that might have accompanied that first batch.
+     */
+
+    SCHEMA,
+
+    /**
+     * The second call to next() has been made and there is more
+     * data to deliver on subsequent calls.
+     */
+
+    RUN,
+
+    /**
+     * No more data to deliver.
+     */
+
+    END,
+
+    /**
+     * An error occurred.
+     */
+
+    FAILED,
+
+    /**
+     * Operation was cancelled. No more batches will be returned,
+     * but close() has not yet been called.
+     */
+
+    CANCELED,
+
+    /**
+     * close() called and resources are released. No more batches
+     * will be returned, but close() has not yet been called.
+     * (This state is semantically identical to FAILED, it exists just
+     * in case an implementation needs to know the difference between the
+     * END, FAILED and CANCELED states.)
+     */
+
+    CLOSED
+  }
+
+  private OperatorDriver.State state = State.START;
+
+  /**
+   * Operator context. The driver "owns" the context and is responsible
+   * for closing it.
+   */
+
+  private final OperatorContext opContext;
+  private final OperatorExec operatorExec;
+  private final BatchAccessor batchAccessor;
+  private int schemaVersion;
+
+  public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+    this.opContext = opContext;
+    this.operatorExec = opExec;
+    batchAccessor = operatorExec.batchAccessor();
+  }
+
+  /**
+   * Get the next batch. Performs initialization on the first call.
+   * @return the iteration outcome to send downstream
+   */
+
+  public IterOutcome next() {
+    try {
+      switch (state) {
+      case START:
+        return start();
+      case RUN:
+        return doNext();
+      default:
+        OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
+        return IterOutcome.NONE;
+      }
+    } catch (UserException e) {
+      cancelSilently();
+      state = State.FAILED;
+      throw e;
+    } catch (Throwable t) {
+      cancelSilently();
+      state = State.FAILED;
+      throw UserException.executionError(t)
+        .addContext("Exception thrown from", operatorLabel())
+        .build(OperatorRecordBatch.logger);
+    }
+  }
+
+  /**
+   * Cancels the operator before reaching EOF.
+   */
+
+  public void cancel() {
+    try {
+      switch (state) {
+      case START:
+      case RUN:
+        cancelSilently();
+        break;
+      default:
+        break;
+      }
+    } finally {
+      state = State.CANCELED;
+    }
+  }
+
+ /**
+   * Start the operator executor. Bind it to the various contexts.
+   * Then start the executor and fetch the first schema.
+   * @return result of the first batch, which should contain
+   * only a schema, or EOF
+   */
+
+  private IterOutcome start() {
+    state = State.SCHEMA;
+    if (operatorExec.buildSchema()) {
+      schemaVersion = batchAccessor.schemaVersion();
+      state = State.RUN;
+      return IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      state = State.END;
+      return IterOutcome.NONE;
+    }
+  }
+
+  /**
+   * Fetch a record batch, detecting EOF and a new schema.
+   * @return the <tt>IterOutcome</tt> for the above cases
+   */
+
+  private IterOutcome doNext() {
+    if (! operatorExec.next()) {
+      state = State.END;
+      return IterOutcome.NONE;
+    }
+    int newVersion = batchAccessor.schemaVersion();
+    if (newVersion != schemaVersion) {
+      schemaVersion = newVersion;
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+    return IterOutcome.OK;
+  }
+
+  /**
+   * Implement a cancellation, and ignore any exception that is
+   * thrown. We're already in trouble here, no need to keep track
+   * of additional things that go wrong.
+   */
+
+  private void cancelSilently() {
+    try {
+      if (state == State.SCHEMA || state == State.RUN) {
+        operatorExec.cancel();
+      }
+    } catch (Throwable t) {
+      // Ignore; we're already in a bad state.
+      OperatorRecordBatch.logger.error("Exception thrown from cancel() for " + operatorLabel(), t);
+    }
+  }
+
+  private String operatorLabel() {
+    return operatorExec.getClass().getCanonicalName();
+  }
+
+  public void close() {
+    if (state == State.CLOSED) {
+      return;
+    }
+    try {
+      operatorExec.close();
+    } catch (UserException e) {
+      throw e;
+    } catch (Throwable t) {
+      throw UserException.executionError(t)
+        .addContext("Exception thrown from", operatorLabel())
+        .build(OperatorRecordBatch.logger);
+    } finally {
+      opContext.close();
+      state = State.CLOSED;
+    }
+  }
+
+  public BatchAccessor batchAccessor() {
+    return batchAccessor;
+  }
+
+  public OperatorContext operatorContext() {
+    return opContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
new file mode 100644
index 0000000..57a8cf3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.exec.ops.OperatorContext;
+
+/**
+ * Core protocol for a Drill operator execution.
+ *
+ * <h4>Lifecycle</h4>
+ *
+ * <ul>
+ * <li>Creation via an operator-specific constructor in the
+ * corresponding <tt>RecordBatchCreator</tt>.</li>
+ * <li><tt>bind()</tt> called to provide the operator services.</li>
+ * <li><tt>buildSchema()</tt> called to define the schema before
+ * fetching the first record batch.</li>
+ * <li><tt>next()</tt> called repeatedly to prepare each new record
+ * batch until EOF or until cancellation.</li>
+ * <li><tt>cancel()</tt> called if the operator should quit early.</li>
+ * <li><tt>close()</tt> called to release resources. Note that
+ * <tt>close()</tt> is called in response to:<ul>
+ *   <li>EOF</li>
+ *   <li>After <tt>cancel()</tt></li>
+ *   <li>After an exception is thrown.</li></ul></li>
+ * </ul>
+ *
+ * <h4>Error Handling</h4>
+ *
+ * Any method can throw an (unchecked) exception. (Drill does not use
+ * checked exceptions.) Preferably, the code will throw a
+ * <tt>UserException</tt> that explains the error to the user. If any
+ * other kind of exception is thrown, then the enclosing class wraps it
+ * in a generic <tt>UserException</tt> that indicates that "something went
+ * wrong", which is less than ideal.
+ *
+ * <h4>Result Set</h4>
+ * The operator "publishes" a result set in response to returning
+ * <tt>true</tt> from <tt>next()</tt> by populating a
+ * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
+ * compatibility with other Drill operators, the set of vectors within
+ * the batch must be the same from one batch to the next.
+ */
+
+public interface OperatorExec {
+
+  /**
+   * Bind this operator to the context. The context provides access
+   * to per-operator, per-fragment and per-Drillbit services.
+   * Also provides access to the operator definition (AKA "pop
+   * config") for this operator.
+   *
+   * @param context operator context
+   */
+
+  public void bind(OperatorContext context);
+
+  /**
+   * Provides a generic access mechanism to the batch's output data.
+   * This method is called after a successful return from
+   * {@link #buildSchema()} and {@link #next()}. The batch itself
+   * can be held in a standard {@link VectorContainer}, or in some
+   * other structure more convenient for this operator.
+   *
+   * @return the access for the batch's output container
+   */
+
+  BatchAccessor batchAccessor();
+
+  /**
+   * Retrieves the schema of the batch before the first actual batch
+   * of data. The schema is returned via an empty batch (no rows,
+   * only schema) from {@link #batchAccessor()}.
+   *
+   * @return true if a schema is available, false if the operator
+   * reached EOF before a schema was found
+   */
+
+  boolean buildSchema();
+
+  /**
+   * Retrieves the next batch of data. The data is returned via
+   * the {@link #batchAccessor()} method.
+   *
+   * @return true if another batch of data is available, false if
+   * EOF was reached and no more data is available
+   */
+
+  boolean next();
+
+  /**
+   * Alerts the operator that the query was cancelled. Generally
+   * optional, but allows the operator to realize that a cancellation
+   * was requested.
+   */
+
+  void cancel();
+
+  /**
+   * Close the operator by releasing all resources that the operator
+   * held. Called after {@link #cancel()} and after {@link #batchAccessor()}
+   * or {@link #next()} returns false.
+   * <p>
+   * Note that there may be a significant delay between the last call to
+   * <tt>next()</tt> and the call to <tt>close()</tt> during which
+   * downstream operators do their work. A tidy operator will release
+   * resources immediately after EOF to avoid holding onto memory or other
+   * resources that could be used by downstream operators.
+   */
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
new file mode 100644
index 0000000..4f0cff8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Modular implementation of the standard Drill record batch iterator
+ * protocol. The protocol has two parts: control of the operator and
+ * access to the record batch. Each is encapsulated in separate
+ * implementation classes to allow easier customization for each
+ * situation. The operator internals are, themselves, abstracted to
+ * yet another class with the steps represented as method calls rather
+ * than as internal states as in the record batch iterator protocol.
+ * <p>
+ * Note that downstream operators make an assumption that the
+ * same vectors will appear from one batch to the next. That is,
+ * not only must the schema be the same, but if column "a" appears
+ * in two batches, the same value vector must back "a" in both
+ * batches. The <tt>TransferPair</tt> abstraction fails if different
+ * vectors appear across batches.
+ */
+
+public class OperatorRecordBatch implements CloseableRecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
+
+  private final OperatorDriver driver;
+  private final BatchAccessor batchAccessor;
+
+  public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
+    OperatorContext opContext = context.newOperatorContext(config);
+    opContext.getStats().startProcessing();
+
+    // Chicken-and-egg binding: the two objects must know about each other. Pass the
+    // context to the operator exec via a bind method.
+
+    try {
+      opExec.bind(opContext);
+      driver = new OperatorDriver(opContext, opExec);
+      batchAccessor = opExec.batchAccessor();
+    } catch (UserException e) {
+      opContext.close();
+      throw e;
+    } catch (Throwable t) {
+      opContext.close();
+      throw UserException.executionError(t)
+        .addContext("Exception thrown from", opExec.getClass().getSimpleName() + ".bind()")
+        .build(logger);
+    }
+    finally {
+      opContext.getStats().stopProcessing();
+    }
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return fragmentContext();
+  }
+
+  // No longer needed, can be removed after all
+  // batch size control work is committed.
+
+  public FragmentContext fragmentContext() {
+    return driver.operatorContext().getFragmentContext();
+  }
+
+  @Override
+  public BatchSchema getSchema() { return batchAccessor.getSchema(); }
+
+  @Override
+  public int getRecordCount() { return batchAccessor.getRowCount(); }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    return batchAccessor.getOutgoingContainer();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return batchAccessor.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return batchAccessor.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return batchAccessor.getWritableBatch();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return batchAccessor.getSelectionVector2();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return batchAccessor.getSelectionVector4();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return batchAccessor.iterator();
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+    driver.cancel();
+  }
+
+  @Override
+  public IterOutcome next() {
+    try {
+      driver.operatorContext().getStats().startProcessing();
+      return driver.next();
+    } finally {
+      driver.operatorContext().getStats().stopProcessing();
+    }
+  }
+
+  @Override
+  public void close() {
+    driver.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
new file mode 100644
index 0000000..cd7c296
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Tracks changes to schemas via "snapshots" over time. That is, given
+ * a schema, tracks if a new schema is the same as the current one. For
+ * example, each batch output from a series of readers might be compared,
+ * as they are returned, to detect schema changes from one batch to the
+ * next. This class does not track vector-by-vector changes as a schema
+ * is built, but rather periodic "snapshots" at times determined by the
+ * operator.
+ * <p>
+ * If an operator is guaranteed to emit a consistent schema, then no
+ * checks need be done, and this tracker will report no schema change.
+ * On the other hand, a scanner might check schema more often. At least
+ * once per reader, and more often if a reader is "late-schema": if the
+ * reader can change schema batch-by-batch.
+ * <p>
+ * Drill defines "schema change" in a very specific way. Not only must
+ * the set of columns be the same, and have the same types, it must also
+ * be the case that the <b>vectors</b> that hold the columns be identical.
+ * Generated code contains references to specific vector objects; passing
+ * along different vectors requires new code to be generated and is treated
+ * as a schema change.
+ * <p>
+ * Drill has no concept of "same schema, different vectors." A change in
+ * vector is just as serious as a change in schema. Hence, operators
+ * try to use the same vectors for their entire lives. That is the change
+ * tracked here.
+ */
+
+// TODO: Does not handle SV4 situations
+
+public class SchemaTracker {
+
+  private int schemaVersion;
+  private BatchSchema currentSchema;
+  private List<ValueVector> currentVectors = new ArrayList<>();
+
+  public void trackSchema(VectorContainer newBatch) {
+
+    if (! isSameSchema(newBatch)) {
+      schemaVersion++;
+      captureSchema(newBatch);
+    }
+  }
+
+  private boolean isSameSchema(VectorContainer newBatch) {
+    if (currentVectors.size() != newBatch.getNumberOfColumns()) {
+      return false;
+    }
+
+    // Compare vectors by identity: not just same type,
+    // must be same instance.
+
+    for (int i = 0; i < currentVectors.size(); i++) {
+      if (currentVectors.get(i) != newBatch.getValueVector(i).getValueVector()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void captureSchema(VectorContainer newBatch) {
+    currentVectors.clear();
+    for (VectorWrapper<?> vw : newBatch) {
+      currentVectors.add(vw.getValueVector());
+    }
+    currentSchema = newBatch.getSchema();
+  }
+
+  public int schemaVersion() { return schemaVersion; }
+  public BatchSchema schema() { return currentSchema; }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
new file mode 100644
index 0000000..e2d78d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class VectorContainerAccessor implements BatchAccessor {
+
+  public static class ContainerAndSv2Accessor extends VectorContainerAccessor {
+
+    private SelectionVector2 sv2;
+
+    public void setSelectionVector(SelectionVector2 sv2) {
+      this.sv2 = sv2;
+    }
+
+    @Override
+    public SelectionVector2 getSelectionVector2() {
+      return sv2;
+    }
+  }
+
+  public static class ContainerAndSv4Accessor extends VectorContainerAccessor {
+
+    private SelectionVector4 sv4;
+
+    @Override
+    public SelectionVector4 getSelectionVector4() {
+      return sv4;
+    }
+  }
+
+  private VectorContainer container;
+  private SchemaTracker schemaTracker = new SchemaTracker();
+
+  /**
+   * Set the vector container. Done initially, and any time the schema of
+   * the container may have changed. May be called with the same container
+   * as the previous call, or a different one. A schema change occurs
+   * unless the vectors are identical across the two containers.
+   *
+   * @param container the container that holds vectors to be sent
+   * downstream
+   */
+
+  public void setContainer(VectorContainer container) {
+    this.container = container;
+    if (container != null) {
+      schemaTracker.trackSchema(container);
+    }
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container == null ? null : container.getSchema();
+  }
+
+  @Override
+  public int schemaVersion() { return schemaTracker.schemaVersion(); }
+
+  @Override
+  public int getRowCount() {
+    return container == null ? 0 : container.getRecordCount();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() { return container; }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(container);
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    // Throws an exception by default because containers
+    // do not support selection vectors.
+    return container.getSelectionVector2();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    // Throws an exception by default because containers
+    // do not support selection vectors.
+     return container.getSelectionVector4();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    if (container == null) {
+      return Collections.emptyIterator();
+    } else {
+      return container.iterator();
+    }
+  }
+
+  @Override
+  public void release() { container.zeroVectors(); }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
new file mode 100644
index 0000000..11af47c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a revised implementation of the Drill RecordBatch protocol. This
+ * version separates concerns into specific classes, and creates as single
+ * "shim" class to implement the iterator protocol, deferring to specific
+ * classes as needed.
+ * <p>
+ * This version is an eventual successor to the original implementation which
+ * used the "kitchen sink" pattern to combine all functionality into s single,
+ * large record batch implementation.
+ */
+
+package org.apache.drill.exec.physical.impl.protocol;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
new file mode 100644
index 0000000..19946dd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor.ContainerAndSv2Accessor;
+import org.apache.drill.exec.proto.UserBitShared.NamePart;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test the implementation of the Drill Volcano iterator protocol that
+ * wraps the modular operator implementation.
+ */
+
+public class TestOperatorRecordBatch extends SubOperatorTest {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubOperatorTest.class);
+
+  /**
+   * Mock operator executor that simply tracks each method call
+   * and provides a light-weight vector container. Returns a
+   * defined number of (batches) with an optional schema change.
+   */
+
+  private class MockOperatorExec implements OperatorExec {
+
+    public boolean bindCalled;
+    public boolean buildSchemaCalled;
+    public int nextCalls = 1;
+    public int nextCount;
+    public int schemaChangeAt = -1;
+    public boolean cancelCalled;
+    public boolean closeCalled;
+    public boolean schemaEOF;
+    private final VectorContainerAccessor batchAccessor;
+
+    public MockOperatorExec() {
+      this(mockBatch());
+    }
+
+    public MockOperatorExec(VectorContainer container) {
+      batchAccessor = new VectorContainerAccessor();
+      batchAccessor.setContainer(container);
+    }
+
+    public MockOperatorExec(VectorContainerAccessor accessor) {
+      batchAccessor = accessor;
+    }
+
+    @Override
+    public void bind(OperatorContext context) { bindCalled = true; }
+
+    @Override
+    public BatchAccessor batchAccessor() {
+      return batchAccessor;
+    }
+
+    @Override
+    public boolean buildSchema() { buildSchemaCalled = true; return ! schemaEOF; }
+
+    @Override
+    public boolean next() {
+      nextCount++;
+      if (nextCount > nextCalls) {
+        return false;
+      }
+      if (nextCount == schemaChangeAt) {
+        BatchSchema newSchema = new SchemaBuilder(batchAccessor.getSchema())
+            .add("b", MinorType.VARCHAR)
+            .build();
+        VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema);
+        batchAccessor.setContainer(newContainer);
+      }
+      return true;
+    }
+
+    @Override
+    public void cancel() { cancelCalled = true; }
+
+    @Override
+    public void close() {
+      batchAccessor().getOutgoingContainer().clear();
+      closeCalled = true;
+    }
+  }
+
+  private static VectorContainer mockBatch() {
+    VectorContainer container = new VectorContainer(fixture.allocator(), new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+    container.buildSchema(SelectionVectorMode.NONE);
+    return container;
+  }
+
+  private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) {
+    // Dummy operator definition
+    PhysicalOperator popConfig = new Limit(null, 0, 100);
+    return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec);
+  }
+
+  /**
+   * Simulate a normal run: return some batches, encounter a schema change.
+   */
+
+  @Test
+  public void testNormalLifeCycle() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 2;
+    opExec.schemaChangeAt = 2;
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+
+      assertSame(fixture.getFragmentContext(), opBatch.fragmentContext());
+      assertNotNull(opBatch.getContext());
+
+      // First call to next() builds schema
+
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertTrue(opExec.bindCalled);
+      assertTrue(opExec.buildSchemaCalled);
+      assertEquals(0, opExec.nextCount);
+
+      // Second call returns the first batch
+
+      assertEquals(IterOutcome.OK, opBatch.next());
+      assertEquals(1, opExec.nextCount);
+
+      // Third call causes a schema change
+
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(2, opExec.nextCount);
+
+      // Fourth call reaches EOF
+
+      assertEquals(IterOutcome.NONE, opBatch.next());
+      assertEquals(3, opExec.nextCount);
+
+      // Close
+    } catch (Exception e) {
+      fail();
+    }
+
+    assertTrue(opExec.closeCalled);
+    assertFalse(opExec.cancelCalled);
+  }
+
+  /**
+   * Simulate a truncated life cycle: next() is never called. Not a valid part
+   * of the protocol; but should be ready anyway.
+   */
+
+  @Test
+  public void testTruncatedLifeCycle() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.schemaEOF = true;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.bindCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate reaching EOF when trying to create the schema.
+   */
+
+  @Test
+  public void testSchemaEOF() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.schemaEOF = true;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.NONE, opBatch.next());
+      assertTrue(opExec.buildSchemaCalled);
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate reaching EOF on the first batch. This simulated data source
+   * discovered a schema, but had no data.
+   */
+
+  @Test
+  public void testFirstBatchEOF() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 0;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertTrue(opExec.buildSchemaCalled);
+      assertEquals(IterOutcome.NONE, opBatch.next());
+      assertEquals(1, opExec.nextCount);
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate the caller failing the operator before getting the schema.
+   */
+
+  @Test
+  public void testFailEarly() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 2;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      opBatch.kill(false);
+      assertFalse(opExec.buildSchemaCalled);
+      assertEquals(0, opExec.nextCount);
+      assertFalse(opExec.cancelCalled);
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate the caller failing the operator before EOF.
+   */
+
+  @Test
+  public void testFailWhileReading() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 2;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(IterOutcome.OK, opBatch.next());
+      opBatch.kill(false);
+      assertTrue(opExec.cancelCalled);
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate the caller failing the operator after EOF but before close.
+   * This is a silly time to fail, but have to handle it anyway.
+   */
+
+  @Test
+  public void testFailBeforeClose() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 2;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(IterOutcome.OK, opBatch.next());
+      assertEquals(IterOutcome.OK, opBatch.next());
+      assertEquals(IterOutcome.NONE, opBatch.next());
+      opBatch.kill(false);
+
+      // Already hit EOF, so fail won't be passed along.
+
+      assertFalse(opExec.cancelCalled);
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Simulate the caller failing the operator after close.
+   * This is violates the operator protocol, but have to handle it anyway.
+   */
+
+  @Test
+  public void testFailAfterClose() {
+    MockOperatorExec opExec = new MockOperatorExec();
+    opExec.nextCalls = 2;
+
+    OperatorRecordBatch opBatch = makeOpBatch(opExec);
+    assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+    assertEquals(IterOutcome.OK, opBatch.next());
+    assertEquals(IterOutcome.OK, opBatch.next());
+    assertEquals(IterOutcome.NONE, opBatch.next());
+    try {
+      opBatch.close();
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+    opBatch.kill(false);
+    assertFalse(opExec.cancelCalled);
+  }
+
+  /**
+   * The record batch abstraction has a bunch of methods to work with a vector container.
+   * Rather than simply exposing the container itself, the batch instead exposes various
+   * container operations. Probably an artifact of its history. In any event, make
+   * sure those methods are passed through to the container accessor.
+   */
+
+  @Test
+  public void testBatchAccessor() {
+    BatchSchema schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(schema)
+        .addRow(10, "fred")
+        .addRow(20, "wilma")
+        .build();
+    MockOperatorExec opExec = new MockOperatorExec(rs.container());
+    opExec.nextCalls = 1;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(schema, opBatch.getSchema());
+      assertEquals(2, opBatch.getRecordCount());
+      assertSame(rs.container(), opBatch.getOutgoingContainer());
+
+      Iterator<VectorWrapper<?>> iter = opBatch.iterator();
+      assertEquals("a", iter.next().getValueVector().getField().getName());
+      assertEquals("b", iter.next().getValueVector().getField().getName());
+
+      // Not a full test of the schema path; just make sure that the
+      // pass-through to the Vector Container works.
+
+      SchemaPath path = SchemaPath.create(NamePart.newBuilder().setName("a").build());
+      TypedFieldId id = opBatch.getValueVectorId(path);
+      assertEquals(MinorType.INT, id.getFinalType().getMinorType());
+      assertEquals(1, id.getFieldIds().length);
+      assertEquals(0, id.getFieldIds()[0]);
+
+      path = SchemaPath.create(NamePart.newBuilder().setName("b").build());
+      id = opBatch.getValueVectorId(path);
+      assertEquals(MinorType.VARCHAR, id.getFinalType().getMinorType());
+      assertEquals(1, id.getFieldIds().length);
+      assertEquals(1, id.getFieldIds()[0]);
+
+      // Sanity check of getValueAccessorById()
+
+      VectorWrapper<?> w = opBatch.getValueAccessorById(IntVector.class, 0);
+      assertNotNull(w);
+      assertEquals("a", w.getValueVector().getField().getName());
+      w = opBatch.getValueAccessorById(VarCharVector.class, 1);
+      assertNotNull(w);
+      assertEquals("b", w.getValueVector().getField().getName());
+
+      // getWritableBatch() ?
+
+      // No selection vectors
+
+      try {
+        opBatch.getSelectionVector2();
+        fail();
+      } catch (UnsupportedOperationException e) {
+        // Expected
+      }
+      try {
+        opBatch.getSelectionVector4();
+        fail();
+      } catch (UnsupportedOperationException e) {
+        // Expected
+      }
+
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+    assertTrue(opExec.closeCalled);
+  }
+
+  @Test
+  public void testSchemaChange() {
+    BatchSchema schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(schema)
+        .addRow(10, "fred")
+        .addRow(20, "wilma")
+        .build();
+    VectorContainer container = rs.container();
+    MockOperatorExec opExec = new MockOperatorExec(container);
+    int schemaVersion = opExec.batchAccessor().schemaVersion();
+
+    // Be tidy: start at 1.
+
+    assertEquals(1, schemaVersion);
+
+    // Changing data does not trigger schema change
+
+    container.zeroVectors();
+    opExec.batchAccessor.setContainer(container);
+    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+    // Different container, same vectors, does not trigger a change
+
+    VectorContainer c2 = new VectorContainer(fixture.allocator());
+    for (VectorWrapper<?> vw : container) {
+      c2.add(vw.getValueVector());
+    }
+    c2.buildSchema(SelectionVectorMode.NONE);
+    opExec.batchAccessor.setContainer(c2);
+    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+    opExec.batchAccessor.setContainer(container);
+    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+    // Replacing a vector with another of the same type does trigger
+    // a change.
+
+    VectorContainer c3 = new VectorContainer(fixture.allocator());
+    c3.add(container.getValueVector(0).getValueVector());
+    c3.add(TypeHelper.getNewVector(
+            container.getValueVector(1).getValueVector().getField(),
+            fixture.allocator(), null));
+    c3.buildSchema(SelectionVectorMode.NONE);
+    opExec.batchAccessor.setContainer(c3);
+    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+    schemaVersion = opExec.batchAccessor().schemaVersion();
+
+    // No change if same schema again
+
+    opExec.batchAccessor.setContainer(c3);
+    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+    // Adding a vector triggers a change
+
+    MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL);
+    c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null));
+    c3.buildSchema(SelectionVectorMode.NONE);
+    opExec.batchAccessor.setContainer(c3);
+    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+    schemaVersion = opExec.batchAccessor().schemaVersion();
+
+    // No change if same schema again
+
+    opExec.batchAccessor.setContainer(c3);
+    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+    // Removing a vector triggers a change
+
+    c3.remove(c3.getValueVector(2).getValueVector());
+    c3.buildSchema(SelectionVectorMode.NONE);
+    assertEquals(2, c3.getNumberOfColumns());
+    opExec.batchAccessor.setContainer(c3);
+    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+    schemaVersion = opExec.batchAccessor().schemaVersion();
+
+    // Clean up
+
+    opExec.close();
+    c2.clear();
+    c3.clear();
+  }
+
+  /**
+   * Test that an SV2 is properly handled by the proper container accessor.
+   */
+
+  @Test
+  public void testSv2() {
+    BatchSchema schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(schema)
+        .addRow(10, "fred")
+        .addRow(20, "wilma")
+        .withSv2()
+        .build();
+
+    ContainerAndSv2Accessor accessor = new ContainerAndSv2Accessor();
+    accessor.setContainer(rs.container());
+    accessor.setSelectionVector(rs.getSv2());
+
+    MockOperatorExec opExec = new MockOperatorExec(accessor);
+    opExec.nextCalls = 1;
+
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertSame(rs.getSv2(), opBatch.getSelectionVector2());
+
+    } catch (Exception e) {
+      fail();
+    }
+    assertTrue(opExec.closeCalled);
+
+    // Must release SV2
+
+    rs.clear();
+  }
+
+  //-----------------------------------------------------------------------
+  // Exception error cases
+  //
+  // Assumes that any of the operator executor methods could throw an
+  // exception. A wise implementation will throw a user exception that the
+  // operator just passes along. A lazy implementation will throw any old
+  // unchecked exception. Validate both cases.
+
+  public static final String ERROR_MSG = "My Bad!";
+
+  /**
+   * Failure on the bind method.
+   */
+
+  @Test
+  public void testWrappedExceptionOnBind() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public void bind(OperatorContext context) {
+         throw new IllegalStateException(ERROR_MSG);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertTrue(e.getCause() instanceof IllegalStateException);
+    } catch (Throwable t) {
+      fail();
+    }
+    assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
+    assertFalse(opExec.closeCalled); // Same with close
+  }
+
+  @Test
+  public void testUserExceptionOnBind() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public void bind(OperatorContext context) {
+         throw UserException.connectionError()
+           .message(ERROR_MSG)
+           .build(logger);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      opBatch.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertNull(e.getCause());
+    } catch (Throwable t) {
+      fail();
+    }
+    assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
+    assertFalse(opExec.closeCalled); // Same with close
+  }
+
+  /**
+   * Failure when building the schema (first call to next()).
+   */
+  @Test
+  public void testWrappedExceptionOnBuildSchema() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public boolean buildSchema() {
+         throw new IllegalStateException(ERROR_MSG);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      opBatch.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertTrue(e.getCause() instanceof IllegalStateException);
+    } catch (Throwable t) {
+      fail();
+    }
+    assertTrue(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  @Test
+  public void testUserExceptionOnBuildSchema() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public boolean buildSchema() {
+        throw UserException.dataReadError()
+            .message(ERROR_MSG)
+            .build(logger);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      opBatch.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertNull(e.getCause());
+    } catch (Throwable t) {
+      fail();
+    }
+    assertTrue(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Failure on the second or subsequent calls to next(), when actually
+   * fetching a record batch.
+   */
+
+  @Test
+  public void testWrappedExceptionOnNext() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public boolean next() {
+         throw new IllegalStateException(ERROR_MSG);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      opBatch.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertTrue(e.getCause() instanceof IllegalStateException);
+    } catch (Throwable t) {
+      fail();
+    }
+    assertTrue(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  @Test
+  public void testUserExceptionOnNext() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public boolean next() {
+        throw UserException.dataReadError()
+              .message(ERROR_MSG)
+              .build(logger);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      opBatch.next();
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertNull(e.getCause());
+    } catch (Throwable t) {
+      fail();
+    }
+    assertTrue(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  /**
+   * Failure when closing the operator implementation.
+   */
+
+  @Test
+  public void testWrappedExceptionOnClose() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public void close() {
+        // Release memory
+        super.close();
+        // Then fail
+        throw new IllegalStateException(ERROR_MSG);
+      }
+    };
+    opExec.nextCalls = 1;
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(IterOutcome.OK, opBatch.next());
+      assertEquals(IterOutcome.NONE, opBatch.next());
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertTrue(e.getCause() instanceof IllegalStateException);
+    } catch (Throwable t) {
+      fail();
+    }
+    assertFalse(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+
+  @Test
+  public void testUserExceptionOnClose() {
+    MockOperatorExec opExec = new MockOperatorExec() {
+      @Override
+      public void close() {
+        // Release memory
+        super.close();
+        // Then fail
+        throw UserException.dataReadError()
+              .message(ERROR_MSG)
+              .build(logger);
+      }
+    };
+    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+      assertEquals(IterOutcome.OK, opBatch.next());
+      assertEquals(IterOutcome.NONE, opBatch.next());
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains(ERROR_MSG));
+      assertNull(e.getCause());
+    } catch (Throwable t) {
+      fail();
+    }
+    assertFalse(opExec.cancelCalled);
+    assertTrue(opExec.closeCalled);
+  }
+}