You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/03 18:47:20 UTC
[14/17] drill git commit: DRILL-6153: Operator framework
DRILL-6153: Operator framework
closes #1121
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69a5f3a9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69a5f3a9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69a5f3a9
Branch: refs/heads/master
Commit: 69a5f3a9c4fadafc588a3e325a12b98cbf359ece
Parents: 4ee207b
Author: Paul Rogers <pr...@cloudera.com>
Authored: Mon Feb 12 22:27:23 2018 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Sat Mar 3 19:47:53 2018 +0200
----------------------------------------------------------------------
.../physical/impl/protocol/BatchAccessor.java | 50 ++
.../physical/impl/protocol/OperatorDriver.java | 234 ++++++
.../physical/impl/protocol/OperatorExec.java | 127 ++++
.../impl/protocol/OperatorRecordBatch.java | 156 ++++
.../physical/impl/protocol/SchemaTracker.java | 98 +++
.../impl/protocol/VectorContainerAccessor.java | 134 ++++
.../physical/impl/protocol/package-info.java | 29 +
.../impl/protocol/TestOperatorRecordBatch.java | 747 +++++++++++++++++++
8 files changed, 1575 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
new file mode 100644
index 0000000..b22353f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Provides access to the row set (record batch) produced by an
+ * operator. Previously, a record batch <i>was</i> an operator.
+ * In this version, the row set is a service of the operator rather
+ * than being part of the operator.
+ */
+
+public interface BatchAccessor {
+ BatchSchema getSchema();
+ int schemaVersion();
+ int getRowCount();
+ VectorContainer getOutgoingContainer();
+ TypedFieldId getValueVectorId(SchemaPath path);
+ VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
+ WritableBatch getWritableBatch();
+ SelectionVector2 getSelectionVector2();
+ SelectionVector4 getSelectionVector4();
+ Iterator<VectorWrapper<?>> iterator();
+ void release();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
new file mode 100644
index 0000000..9e6190c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+ public enum State {
+
+ /**
+ * Before the first call to next().
+ */
+
+ START,
+
+ /**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+ SCHEMA,
+
+ /**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+ RUN,
+
+ /**
+ * No more data to deliver.
+ */
+
+ END,
+
+ /**
+ * An error occurred.
+ */
+
+ FAILED,
+
+ /**
+ * Operation was cancelled. No more batches will be returned,
+ * but close() has not yet been called.
+ */
+
+ CANCELED,
+
+ /**
+ * close() called and resources are released. No more batches
+ * will be returned, but close() has not yet been called.
+ * (This state is semantically identical to FAILED, it exists just
+ * in case an implementation needs to know the difference between the
+ * END, FAILED and CANCELED states.)
+ */
+
+ CLOSED
+ }
+
+ private OperatorDriver.State state = State.START;
+
+ /**
+ * Operator context. The driver "owns" the context and is responsible
+ * for closing it.
+ */
+
+ private final OperatorContext opContext;
+ private final OperatorExec operatorExec;
+ private final BatchAccessor batchAccessor;
+ private int schemaVersion;
+
+ public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+ this.opContext = opContext;
+ this.operatorExec = opExec;
+ batchAccessor = operatorExec.batchAccessor();
+ }
+
+ /**
+ * Get the next batch. Performs initialization on the first call.
+ * @return the iteration outcome to send downstream
+ */
+
+ public IterOutcome next() {
+ try {
+ switch (state) {
+ case START:
+ return start();
+ case RUN:
+ return doNext();
+ default:
+ OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel());
+ return IterOutcome.NONE;
+ }
+ } catch (UserException e) {
+ cancelSilently();
+ state = State.FAILED;
+ throw e;
+ } catch (Throwable t) {
+ cancelSilently();
+ state = State.FAILED;
+ throw UserException.executionError(t)
+ .addContext("Exception thrown from", operatorLabel())
+ .build(OperatorRecordBatch.logger);
+ }
+ }
+
+ /**
+ * Cancels the operator before reaching EOF.
+ */
+
+ public void cancel() {
+ try {
+ switch (state) {
+ case START:
+ case RUN:
+ cancelSilently();
+ break;
+ default:
+ break;
+ }
+ } finally {
+ state = State.CANCELED;
+ }
+ }
+
+ /**
+ * Start the operator executor. Bind it to the various contexts.
+ * Then start the executor and fetch the first schema.
+ * @return result of the first batch, which should contain
+ * only a schema, or EOF
+ */
+
+ private IterOutcome start() {
+ state = State.SCHEMA;
+ if (operatorExec.buildSchema()) {
+ schemaVersion = batchAccessor.schemaVersion();
+ state = State.RUN;
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ state = State.END;
+ return IterOutcome.NONE;
+ }
+ }
+
+ /**
+ * Fetch a record batch, detecting EOF and a new schema.
+ * @return the <tt>IterOutcome</tt> for the above cases
+ */
+
+ private IterOutcome doNext() {
+ if (! operatorExec.next()) {
+ state = State.END;
+ return IterOutcome.NONE;
+ }
+ int newVersion = batchAccessor.schemaVersion();
+ if (newVersion != schemaVersion) {
+ schemaVersion = newVersion;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+ return IterOutcome.OK;
+ }
+
+ /**
+ * Implement a cancellation, and ignore any exception that is
+ * thrown. We're already in trouble here, no need to keep track
+ * of additional things that go wrong.
+ */
+
+ private void cancelSilently() {
+ try {
+ if (state == State.SCHEMA || state == State.RUN) {
+ operatorExec.cancel();
+ }
+ } catch (Throwable t) {
+ // Ignore; we're already in a bad state.
+ OperatorRecordBatch.logger.error("Exception thrown from cancel() for " + operatorLabel(), t);
+ }
+ }
+
+ private String operatorLabel() {
+ return operatorExec.getClass().getCanonicalName();
+ }
+
+ public void close() {
+ if (state == State.CLOSED) {
+ return;
+ }
+ try {
+ operatorExec.close();
+ } catch (UserException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw UserException.executionError(t)
+ .addContext("Exception thrown from", operatorLabel())
+ .build(OperatorRecordBatch.logger);
+ } finally {
+ opContext.close();
+ state = State.CLOSED;
+ }
+ }
+
+ public BatchAccessor batchAccessor() {
+ return batchAccessor;
+ }
+
+ public OperatorContext operatorContext() {
+ return opContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
new file mode 100644
index 0000000..57a8cf3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.exec.ops.OperatorContext;
+
+/**
+ * Core protocol for a Drill operator execution.
+ *
+ * <h4>Lifecycle</h4>
+ *
+ * <ul>
+ * <li>Creation via an operator-specific constructor in the
+ * corresponding <tt>RecordBatchCreator</tt>.</li>
+ * <li><tt>bind()</tt> called to provide the operator services.</li>
+ * <li><tt>buildSchema()</tt> called to define the schema before
+ * fetching the first record batch.</li>
+ * <li><tt>next()</tt> called repeatedly to prepare each new record
+ * batch until EOF or until cancellation.</li>
+ * <li><tt>cancel()</tt> called if the operator should quit early.</li>
+ * <li><tt>close()</tt> called to release resources. Note that
+ * <tt>close()</tt> is called in response to:<ul>
+ * <li>EOF</li>
+ * <li>After <tt>cancel()</tt></li>
+ * <li>After an exception is thrown.</li></ul></li>
+ * </ul>
+ *
+ * <h4>Error Handling</h4>
+ *
+ * Any method can throw an (unchecked) exception. (Drill does not use
+ * checked exceptions.) Preferably, the code will throw a
+ * <tt>UserException</tt> that explains the error to the user. If any
+ * other kind of exception is thrown, then the enclosing class wraps it
+ * in a generic <tt>UserException</tt> that indicates that "something went
+ * wrong", which is less than ideal.
+ *
+ * <h4>Result Set</h4>
+ * The operator "publishes" a result set in response to returning
+ * <tt>true</tt> from <tt>next()</tt> by populating a
+ * {@link BatchAccesor} provided via {@link #batchAccessor()}. For
+ * compatibility with other Drill operators, the set of vectors within
+ * the batch must be the same from one batch to the next.
+ */
+
+public interface OperatorExec {
+
+ /**
+ * Bind this operator to the context. The context provides access
+ * to per-operator, per-fragment and per-Drillbit services.
+ * Also provides access to the operator definition (AKA "pop
+ * config") for this operator.
+ *
+ * @param context operator context
+ */
+
+ public void bind(OperatorContext context);
+
+ /**
+ * Provides a generic access mechanism to the batch's output data.
+ * This method is called after a successful return from
+ * {@link #buildSchema()} and {@link #next()}. The batch itself
+ * can be held in a standard {@link VectorContainer}, or in some
+ * other structure more convenient for this operator.
+ *
+ * @return the access for the batch's output container
+ */
+
+ BatchAccessor batchAccessor();
+
+ /**
+ * Retrieves the schema of the batch before the first actual batch
+ * of data. The schema is returned via an empty batch (no rows,
+ * only schema) from {@link #batchAccessor()}.
+ *
+ * @return true if a schema is available, false if the operator
+ * reached EOF before a schema was found
+ */
+
+ boolean buildSchema();
+
+ /**
+ * Retrieves the next batch of data. The data is returned via
+ * the {@link #batchAccessor()} method.
+ *
+ * @return true if another batch of data is available, false if
+ * EOF was reached and no more data is available
+ */
+
+ boolean next();
+
+ /**
+ * Alerts the operator that the query was cancelled. Generally
+ * optional, but allows the operator to realize that a cancellation
+ * was requested.
+ */
+
+ void cancel();
+
+ /**
+ * Close the operator by releasing all resources that the operator
+ * held. Called after {@link #cancel()} and after {@link #batchAccessor()}
+ * or {@link #next()} returns false.
+ * <p>
+ * Note that there may be a significant delay between the last call to
+ * <tt>next()</tt> and the call to <tt>close()</tt> during which
+ * downstream operators do their work. A tidy operator will release
+ * resources immediately after EOF to avoid holding onto memory or other
+ * resources that could be used by downstream operators.
+ */
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
new file mode 100644
index 0000000..4f0cff8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Modular implementation of the standard Drill record batch iterator
+ * protocol. The protocol has two parts: control of the operator and
+ * access to the record batch. Each is encapsulated in separate
+ * implementation classes to allow easier customization for each
+ * situation. The operator internals are, themselves, abstracted to
+ * yet another class with the steps represented as method calls rather
+ * than as internal states as in the record batch iterator protocol.
+ * <p>
+ * Note that downstream operators make an assumption that the
+ * same vectors will appear from one batch to the next. That is,
+ * not only must the schema be the same, but if column "a" appears
+ * in two batches, the same value vector must back "a" in both
+ * batches. The <tt>TransferPair</tt> abstraction fails if different
+ * vectors appear across batches.
+ */
+
+public class OperatorRecordBatch implements CloseableRecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
+
+ private final OperatorDriver driver;
+ private final BatchAccessor batchAccessor;
+
+ public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) {
+ OperatorContext opContext = context.newOperatorContext(config);
+ opContext.getStats().startProcessing();
+
+ // Chicken-and-egg binding: the two objects must know about each other. Pass the
+ // context to the operator exec via a bind method.
+
+ try {
+ opExec.bind(opContext);
+ driver = new OperatorDriver(opContext, opExec);
+ batchAccessor = opExec.batchAccessor();
+ } catch (UserException e) {
+ opContext.close();
+ throw e;
+ } catch (Throwable t) {
+ opContext.close();
+ throw UserException.executionError(t)
+ .addContext("Exception thrown from", opExec.getClass().getSimpleName() + ".bind()")
+ .build(logger);
+ }
+ finally {
+ opContext.getStats().stopProcessing();
+ }
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return fragmentContext();
+ }
+
+ // No longer needed, can be removed after all
+ // batch size control work is committed.
+
+ public FragmentContext fragmentContext() {
+ return driver.operatorContext().getFragmentContext();
+ }
+
+ @Override
+ public BatchSchema getSchema() { return batchAccessor.getSchema(); }
+
+ @Override
+ public int getRecordCount() { return batchAccessor.getRowCount(); }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ return batchAccessor.getOutgoingContainer();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return batchAccessor.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return batchAccessor.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return batchAccessor.getWritableBatch();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return batchAccessor.getSelectionVector2();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return batchAccessor.getSelectionVector4();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return batchAccessor.iterator();
+ }
+
+ @Override
+ public void kill(boolean sendUpstream) {
+ driver.cancel();
+ }
+
+ @Override
+ public IterOutcome next() {
+ try {
+ driver.operatorContext().getStats().startProcessing();
+ return driver.next();
+ } finally {
+ driver.operatorContext().getStats().stopProcessing();
+ }
+ }
+
+ @Override
+ public void close() {
+ driver.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
new file mode 100644
index 0000000..cd7c296
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Tracks changes to schemas via "snapshots" over time. That is, given
+ * a schema, tracks if a new schema is the same as the current one. For
+ * example, each batch output from a series of readers might be compared,
+ * as they are returned, to detect schema changes from one batch to the
+ * next. This class does not track vector-by-vector changes as a schema
+ * is built, but rather periodic "snapshots" at times determined by the
+ * operator.
+ * <p>
+ * If an operator is guaranteed to emit a consistent schema, then no
+ * checks need be done, and this tracker will report no schema change.
+ * On the other hand, a scanner might check schema more often. At least
+ * once per reader, and more often if a reader is "late-schema": if the
+ * reader can change schema batch-by-batch.
+ * <p>
+ * Drill defines "schema change" in a very specific way. Not only must
+ * the set of columns be the same, and have the same types, it must also
+ * be the case that the <b>vectors</b> that hold the columns be identical.
+ * Generated code contains references to specific vector objects; passing
+ * along different vectors requires new code to be generated and is treated
+ * as a schema change.
+ * <p>
+ * Drill has no concept of "same schema, different vectors." A change in
+ * vector is just as serious as a change in schema. Hence, operators
+ * try to use the same vectors for their entire lives. That is the change
+ * tracked here.
+ */
+
+// TODO: Does not handle SV4 situations
+
+public class SchemaTracker {
+
+ private int schemaVersion;
+ private BatchSchema currentSchema;
+ private List<ValueVector> currentVectors = new ArrayList<>();
+
+ public void trackSchema(VectorContainer newBatch) {
+
+ if (! isSameSchema(newBatch)) {
+ schemaVersion++;
+ captureSchema(newBatch);
+ }
+ }
+
+ private boolean isSameSchema(VectorContainer newBatch) {
+ if (currentVectors.size() != newBatch.getNumberOfColumns()) {
+ return false;
+ }
+
+ // Compare vectors by identity: not just same type,
+ // must be same instance.
+
+ for (int i = 0; i < currentVectors.size(); i++) {
+ if (currentVectors.get(i) != newBatch.getValueVector(i).getValueVector()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void captureSchema(VectorContainer newBatch) {
+ currentVectors.clear();
+ for (VectorWrapper<?> vw : newBatch) {
+ currentVectors.add(vw.getValueVector());
+ }
+ currentSchema = newBatch.getSchema();
+ }
+
+ public int schemaVersion() { return schemaVersion; }
+ public BatchSchema schema() { return currentSchema; }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
new file mode 100644
index 0000000..e2d78d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class VectorContainerAccessor implements BatchAccessor {
+
+ public static class ContainerAndSv2Accessor extends VectorContainerAccessor {
+
+ private SelectionVector2 sv2;
+
+ public void setSelectionVector(SelectionVector2 sv2) {
+ this.sv2 = sv2;
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv2;
+ }
+ }
+
+ public static class ContainerAndSv4Accessor extends VectorContainerAccessor {
+
+ private SelectionVector4 sv4;
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return sv4;
+ }
+ }
+
+ private VectorContainer container;
+ private SchemaTracker schemaTracker = new SchemaTracker();
+
+ /**
+ * Set the vector container. Done initially, and any time the schema of
+ * the container may have changed. May be called with the same container
+ * as the previous call, or a different one. A schema change occurs
+ * unless the vectors are identical across the two containers.
+ *
+ * @param container the container that holds vectors to be sent
+ * downstream
+ */
+
+ public void setContainer(VectorContainer container) {
+ this.container = container;
+ if (container != null) {
+ schemaTracker.trackSchema(container);
+ }
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container == null ? null : container.getSchema();
+ }
+
+ @Override
+ public int schemaVersion() { return schemaTracker.schemaVersion(); }
+
+ @Override
+ public int getRowCount() {
+ return container == null ? 0 : container.getRecordCount();
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() { return container; }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(container);
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ // Throws an exception by default because containers
+ // do not support selection vectors.
+ return container.getSelectionVector2();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ // Throws an exception by default because containers
+ // do not support selection vectors.
+ return container.getSelectionVector4();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ if (container == null) {
+ return Collections.emptyIterator();
+ } else {
+ return container.iterator();
+ }
+ }
+
+ @Override
+ public void release() { container.zeroVectors(); }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
new file mode 100644
index 0000000..11af47c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a revised implementation of the Drill RecordBatch protocol. This
+ * version separates concerns into specific classes, and creates as single
+ * "shim" class to implement the iterator protocol, deferring to specific
+ * classes as needed.
+ * <p>
+ * This version is an eventual successor to the original implementation which
+ * used the "kitchen sink" pattern to combine all functionality into s single,
+ * large record batch implementation.
+ */
+
+package org.apache.drill.exec.physical.impl.protocol;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
new file mode 100644
index 0000000..19946dd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor.ContainerAndSv2Accessor;
+import org.apache.drill.exec.proto.UserBitShared.NamePart;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test the implementation of the Drill Volcano iterator protocol that
+ * wraps the modular operator implementation.
+ */
+
+public class TestOperatorRecordBatch extends SubOperatorTest {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubOperatorTest.class);
+
+ /**
+ * Mock operator executor that simply tracks each method call
+ * and provides a light-weight vector container. Returns a
+ * defined number of (batches) with an optional schema change.
+ */
+
+ private class MockOperatorExec implements OperatorExec {
+
+ public boolean bindCalled;
+ public boolean buildSchemaCalled;
+ public int nextCalls = 1;
+ public int nextCount;
+ public int schemaChangeAt = -1;
+ public boolean cancelCalled;
+ public boolean closeCalled;
+ public boolean schemaEOF;
+ private final VectorContainerAccessor batchAccessor;
+
+ public MockOperatorExec() {
+ this(mockBatch());
+ }
+
+ public MockOperatorExec(VectorContainer container) {
+ batchAccessor = new VectorContainerAccessor();
+ batchAccessor.setContainer(container);
+ }
+
+ public MockOperatorExec(VectorContainerAccessor accessor) {
+ batchAccessor = accessor;
+ }
+
+ @Override
+ public void bind(OperatorContext context) { bindCalled = true; }
+
+ @Override
+ public BatchAccessor batchAccessor() {
+ return batchAccessor;
+ }
+
+ @Override
+ public boolean buildSchema() { buildSchemaCalled = true; return ! schemaEOF; }
+
+ @Override
+ public boolean next() {
+ nextCount++;
+ if (nextCount > nextCalls) {
+ return false;
+ }
+ if (nextCount == schemaChangeAt) {
+ BatchSchema newSchema = new SchemaBuilder(batchAccessor.getSchema())
+ .add("b", MinorType.VARCHAR)
+ .build();
+ VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema);
+ batchAccessor.setContainer(newContainer);
+ }
+ return true;
+ }
+
+ @Override
+ public void cancel() { cancelCalled = true; }
+
+ @Override
+ public void close() {
+ batchAccessor().getOutgoingContainer().clear();
+ closeCalled = true;
+ }
+ }
+
+ private static VectorContainer mockBatch() {
+ VectorContainer container = new VectorContainer(fixture.allocator(), new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .build());
+ container.buildSchema(SelectionVectorMode.NONE);
+ return container;
+ }
+
+ private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) {
+ // Dummy operator definition
+ PhysicalOperator popConfig = new Limit(null, 0, 100);
+ return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec);
+ }
+
+ /**
+ * Simulate a normal run: return some batches, encounter a schema change.
+ */
+
+ @Test
+ public void testNormalLifeCycle() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 2;
+ opExec.schemaChangeAt = 2;
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+
+ assertSame(fixture.getFragmentContext(), opBatch.fragmentContext());
+ assertNotNull(opBatch.getContext());
+
+ // First call to next() builds schema
+
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertTrue(opExec.bindCalled);
+ assertTrue(opExec.buildSchemaCalled);
+ assertEquals(0, opExec.nextCount);
+
+ // Second call returns the first batch
+
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(1, opExec.nextCount);
+
+ // Third call causes a schema change
+
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(2, opExec.nextCount);
+
+ // Fourth call reaches EOF
+
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ assertEquals(3, opExec.nextCount);
+
+ // Close
+ } catch (Exception e) {
+ fail();
+ }
+
+ assertTrue(opExec.closeCalled);
+ assertFalse(opExec.cancelCalled);
+ }
+
+ /**
+ * Simulate a truncated life cycle: next() is never called. Not a valid part
+ * of the protocol; but should be ready anyway.
+ */
+
+ @Test
+ public void testTruncatedLifeCycle() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.schemaEOF = true;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.bindCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate reaching EOF when trying to create the schema.
+ */
+
+ @Test
+ public void testSchemaEOF() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.schemaEOF = true;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ assertTrue(opExec.buildSchemaCalled);
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate reaching EOF on the first batch. This simulated data source
+ * discovered a schema, but had no data.
+ */
+
+ @Test
+ public void testFirstBatchEOF() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 0;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertTrue(opExec.buildSchemaCalled);
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ assertEquals(1, opExec.nextCount);
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate the caller failing the operator before getting the schema.
+ */
+
+ @Test
+ public void testFailEarly() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 2;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ opBatch.kill(false);
+ assertFalse(opExec.buildSchemaCalled);
+ assertEquals(0, opExec.nextCount);
+ assertFalse(opExec.cancelCalled);
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate the caller failing the operator before EOF.
+ */
+
+ @Test
+ public void testFailWhileReading() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 2;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ opBatch.kill(false);
+ assertTrue(opExec.cancelCalled);
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate the caller failing the operator after EOF but before close.
+ * This is a silly time to fail, but have to handle it anyway.
+ */
+
+ @Test
+ public void testFailBeforeClose() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 2;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ opBatch.kill(false);
+
+ // Already hit EOF, so fail won't be passed along.
+
+ assertFalse(opExec.cancelCalled);
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Simulate the caller failing the operator after close.
+ * This is violates the operator protocol, but have to handle it anyway.
+ */
+
+ @Test
+ public void testFailAfterClose() {
+ MockOperatorExec opExec = new MockOperatorExec();
+ opExec.nextCalls = 2;
+
+ OperatorRecordBatch opBatch = makeOpBatch(opExec);
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ try {
+ opBatch.close();
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+ opBatch.kill(false);
+ assertFalse(opExec.cancelCalled);
+ }
+
+ /**
+ * The record batch abstraction has a bunch of methods to work with a vector container.
+ * Rather than simply exposing the container itself, the batch instead exposes various
+ * container operations. Probably an artifact of its history. In any event, make
+ * sure those methods are passed through to the container accessor.
+ */
+
+ @Test
+ public void testBatchAccessor() {
+ BatchSchema schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .build();
+ SingleRowSet rs = fixture.rowSetBuilder(schema)
+ .addRow(10, "fred")
+ .addRow(20, "wilma")
+ .build();
+ MockOperatorExec opExec = new MockOperatorExec(rs.container());
+ opExec.nextCalls = 1;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(schema, opBatch.getSchema());
+ assertEquals(2, opBatch.getRecordCount());
+ assertSame(rs.container(), opBatch.getOutgoingContainer());
+
+ Iterator<VectorWrapper<?>> iter = opBatch.iterator();
+ assertEquals("a", iter.next().getValueVector().getField().getName());
+ assertEquals("b", iter.next().getValueVector().getField().getName());
+
+ // Not a full test of the schema path; just make sure that the
+ // pass-through to the Vector Container works.
+
+ SchemaPath path = SchemaPath.create(NamePart.newBuilder().setName("a").build());
+ TypedFieldId id = opBatch.getValueVectorId(path);
+ assertEquals(MinorType.INT, id.getFinalType().getMinorType());
+ assertEquals(1, id.getFieldIds().length);
+ assertEquals(0, id.getFieldIds()[0]);
+
+ path = SchemaPath.create(NamePart.newBuilder().setName("b").build());
+ id = opBatch.getValueVectorId(path);
+ assertEquals(MinorType.VARCHAR, id.getFinalType().getMinorType());
+ assertEquals(1, id.getFieldIds().length);
+ assertEquals(1, id.getFieldIds()[0]);
+
+ // Sanity check of getValueAccessorById()
+
+ VectorWrapper<?> w = opBatch.getValueAccessorById(IntVector.class, 0);
+ assertNotNull(w);
+ assertEquals("a", w.getValueVector().getField().getName());
+ w = opBatch.getValueAccessorById(VarCharVector.class, 1);
+ assertNotNull(w);
+ assertEquals("b", w.getValueVector().getField().getName());
+
+ // getWritableBatch() ?
+
+ // No selection vectors
+
+ try {
+ opBatch.getSelectionVector2();
+ fail();
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+ try {
+ opBatch.getSelectionVector4();
+ fail();
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ assertTrue(opExec.closeCalled);
+ }
+
+ @Test
+ public void testSchemaChange() {
+ BatchSchema schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .build();
+ SingleRowSet rs = fixture.rowSetBuilder(schema)
+ .addRow(10, "fred")
+ .addRow(20, "wilma")
+ .build();
+ VectorContainer container = rs.container();
+ MockOperatorExec opExec = new MockOperatorExec(container);
+ int schemaVersion = opExec.batchAccessor().schemaVersion();
+
+ // Be tidy: start at 1.
+
+ assertEquals(1, schemaVersion);
+
+ // Changing data does not trigger schema change
+
+ container.zeroVectors();
+ opExec.batchAccessor.setContainer(container);
+ assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+ // Different container, same vectors, does not trigger a change
+
+ VectorContainer c2 = new VectorContainer(fixture.allocator());
+ for (VectorWrapper<?> vw : container) {
+ c2.add(vw.getValueVector());
+ }
+ c2.buildSchema(SelectionVectorMode.NONE);
+ opExec.batchAccessor.setContainer(c2);
+ assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+ opExec.batchAccessor.setContainer(container);
+ assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+ // Replacing a vector with another of the same type does trigger
+ // a change.
+
+ VectorContainer c3 = new VectorContainer(fixture.allocator());
+ c3.add(container.getValueVector(0).getValueVector());
+ c3.add(TypeHelper.getNewVector(
+ container.getValueVector(1).getValueVector().getField(),
+ fixture.allocator(), null));
+ c3.buildSchema(SelectionVectorMode.NONE);
+ opExec.batchAccessor.setContainer(c3);
+ assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+ schemaVersion = opExec.batchAccessor().schemaVersion();
+
+ // No change if same schema again
+
+ opExec.batchAccessor.setContainer(c3);
+ assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+ // Adding a vector triggers a change
+
+ MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL);
+ c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null));
+ c3.buildSchema(SelectionVectorMode.NONE);
+ opExec.batchAccessor.setContainer(c3);
+ assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+ schemaVersion = opExec.batchAccessor().schemaVersion();
+
+ // No change if same schema again
+
+ opExec.batchAccessor.setContainer(c3);
+ assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
+
+ // Removing a vector triggers a change
+
+ c3.remove(c3.getValueVector(2).getValueVector());
+ c3.buildSchema(SelectionVectorMode.NONE);
+ assertEquals(2, c3.getNumberOfColumns());
+ opExec.batchAccessor.setContainer(c3);
+ assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
+ schemaVersion = opExec.batchAccessor().schemaVersion();
+
+ // Clean up
+
+ opExec.close();
+ c2.clear();
+ c3.clear();
+ }
+
+ /**
+ * Test that an SV2 is properly handled by the proper container accessor.
+ */
+
+ @Test
+ public void testSv2() {
+ BatchSchema schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .build();
+ SingleRowSet rs = fixture.rowSetBuilder(schema)
+ .addRow(10, "fred")
+ .addRow(20, "wilma")
+ .withSv2()
+ .build();
+
+ ContainerAndSv2Accessor accessor = new ContainerAndSv2Accessor();
+ accessor.setContainer(rs.container());
+ accessor.setSelectionVector(rs.getSv2());
+
+ MockOperatorExec opExec = new MockOperatorExec(accessor);
+ opExec.nextCalls = 1;
+
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertSame(rs.getSv2(), opBatch.getSelectionVector2());
+
+ } catch (Exception e) {
+ fail();
+ }
+ assertTrue(opExec.closeCalled);
+
+ // Must release SV2
+
+ rs.clear();
+ }
+
+ //-----------------------------------------------------------------------
+ // Exception error cases
+ //
+ // Assumes that any of the operator executor methods could throw an
+ // exception. A wise implementation will throw a user exception that the
+ // operator just passes along. A lazy implementation will throw any old
+ // unchecked exception. Validate both cases.
+
+ public static final String ERROR_MSG = "My Bad!";
+
+ /**
+ * Failure on the bind method.
+ */
+
+ @Test
+ public void testWrappedExceptionOnBind() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public void bind(OperatorContext context) {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ } catch (Throwable t) {
+ fail();
+ }
+ assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
+ assertFalse(opExec.closeCalled); // Same with close
+ }
+
+ @Test
+ public void testUserExceptionOnBind() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public void bind(OperatorContext context) {
+ throw UserException.connectionError()
+ .message(ERROR_MSG)
+ .build(logger);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ opBatch.next();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertNull(e.getCause());
+ } catch (Throwable t) {
+ fail();
+ }
+ assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
+ assertFalse(opExec.closeCalled); // Same with close
+ }
+
+ /**
+ * Failure when building the schema (first call to next()).
+ */
+ @Test
+ public void testWrappedExceptionOnBuildSchema() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public boolean buildSchema() {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ opBatch.next();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ } catch (Throwable t) {
+ fail();
+ }
+ assertTrue(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ @Test
+ public void testUserExceptionOnBuildSchema() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public boolean buildSchema() {
+ throw UserException.dataReadError()
+ .message(ERROR_MSG)
+ .build(logger);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ opBatch.next();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertNull(e.getCause());
+ } catch (Throwable t) {
+ fail();
+ }
+ assertTrue(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Failure on the second or subsequent calls to next(), when actually
+ * fetching a record batch.
+ */
+
+ @Test
+ public void testWrappedExceptionOnNext() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public boolean next() {
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ opBatch.next();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ } catch (Throwable t) {
+ fail();
+ }
+ assertTrue(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ @Test
+ public void testUserExceptionOnNext() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public boolean next() {
+ throw UserException.dataReadError()
+ .message(ERROR_MSG)
+ .build(logger);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ opBatch.next();
+ fail();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertNull(e.getCause());
+ } catch (Throwable t) {
+ fail();
+ }
+ assertTrue(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ /**
+ * Failure when closing the operator implementation.
+ */
+
+ @Test
+ public void testWrappedExceptionOnClose() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public void close() {
+ // Release memory
+ super.close();
+ // Then fail
+ throw new IllegalStateException(ERROR_MSG);
+ }
+ };
+ opExec.nextCalls = 1;
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ } catch (Throwable t) {
+ fail();
+ }
+ assertFalse(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+
+ @Test
+ public void testUserExceptionOnClose() {
+ MockOperatorExec opExec = new MockOperatorExec() {
+ @Override
+ public void close() {
+ // Release memory
+ super.close();
+ // Then fail
+ throw UserException.dataReadError()
+ .message(ERROR_MSG)
+ .build(logger);
+ }
+ };
+ try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
+ assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
+ assertEquals(IterOutcome.OK, opBatch.next());
+ assertEquals(IterOutcome.NONE, opBatch.next());
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains(ERROR_MSG));
+ assertNull(e.getCause());
+ } catch (Throwable t) {
+ fail();
+ }
+ assertFalse(opExec.cancelCalled);
+ assertTrue(opExec.closeCalled);
+ }
+}