You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2020/05/25 22:34:52 UTC
[drill] 03/04: DRILL-7734: Revise the result set reader
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 953280b141eea5ddc83f452a73221566d27344f6
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue May 5 18:22:05 2020 -0700
DRILL-7734: Revise the result set reader
Revised into two forms: push (for streaming JSON results) and
pull (for one operator reading from another).
closes #2077
---
...sultSetReader.java => PullResultSetReader.java} | 86 ++--
.../physical/resultSet/PushResultSetReader.java | 49 +++
.../exec/physical/resultSet/ResultSetCopier.java | 19 +-
.../resultSet/impl/PullResultSetReaderImpl.java | 134 ++++++
.../resultSet/impl/PushResultSetReaderImpl.java | 107 +++++
.../resultSet/impl/ResultSetCopierImpl.java | 61 ++-
.../resultSet/impl/ResultSetReaderImpl.java | 106 -----
.../resultSet/impl/TestResultSetCopier.java | 447 +++++++++++----------
.../resultSet/impl/TestResultSetReader.java | 182 ++++++---
.../easy/text/compliant/TestCsvWithSchema.java | 4 +-
.../drill/exec/store/mock/TestMockPlugin.java | 2 +-
.../drill/test/BufferingQueryEventListener.java | 13 +-
.../java/org/apache/drill/test/PrintingUtils.java | 3 +-
.../java/org/apache/drill/test/ProfileParser.java | 6 +-
.../org/apache/drill/test/QueryBatchIterator.java | 162 ++++++++
.../java/org/apache/drill/test/QueryBuilder.java | 66 +--
.../java/org/apache/drill/test/QueryResultSet.java | 10 +-
.../org/apache/drill/test/QueryRowSetIterator.java | 70 +---
.../org/apache/drill/test/QueryRowSetReader.java | 44 ++
.../org/apache/drill/test/StatementParser.java | 84 ++++
.../apache/drill/test/rowSet/RowSetUtilities.java | 10 +-
21 files changed, 1060 insertions(+), 605 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java
similarity index 57%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java
index 45f3193..1877ec4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java
@@ -19,24 +19,17 @@ package org.apache.drill.exec.physical.resultSet;
import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
/**
* Iterates over the set of batches in a result set, providing
* a row set reader to iterate over the rows within each batch.
- * Handles schema changes between batches.
- * <p>
- * Designed to handle batches arriving from a single upstream
- * operator. Uses Drill's strict form of schema identity: that
- * not only must the column definitions match; the vectors must
- * be identical from one batch to the next. If the vectors differ,
- * then this class assumes a new schema has occurred, and will
- * rebuild all the underlying readers, which can be costly.
+ * Handles schema changes between batches. A typical use is to
+ * iterate over batches from an upstream operator. Protocol:
*
* <h4>Protocol</h4>
* <ol>
- * <li>Create an instance, passing in a
- * {@link BatchAccessor} to hold the batch and optional
- * selection vector.</li>
+ * <li>Create an instance.</li>
* <li>For each incoming batch:
* <ol>
* <li>Call {@link #start()} to attach the batch. The associated
@@ -49,39 +42,59 @@ import org.apache.drill.exec.physical.rowSet.RowSetReader;
* </ol>
* <li>Call {@link #close()} after all batches are read.</li>
* </ol>
+ * <ul>
+ * <li>Create the result set reader via a specific subclass.
+ * If a query has a null result (no rows,
+ * no schema), the code which creates this class should instead
+ * indicate that no results are available. This class is only for
+ * the cases </li>
+ * <li>Call {@link #schema()}, if desired, to obtain the schema
+ * for this result set.</li>
+ * <li>Call {@link #next()} to advance to the first batch.</li>
+ * <li>If {@code next()} returns {@code true}, then call
+ * {@link #reader()} to obtain a reader over rows. This reader also
+ * provides the batch schema.</li>
+ * <li>Use the reader to iterate over rows in the batch.</li>
+ * <li>Call {@code next()} to advance to the next batch and
+ * repeat.</li>
+ * </ul>
+ * <p>
+ * The implementation may perform complex tasks behind the scenes:
+ * coordinate with the query runner (if remote), drive an operator
+ * (if within a DAG), etc. The implementation takes an interface
+ * that interfaces with the source of batches.
+ * <p>
+ * Designed to handle batches arriving from a single upstream
+ * operator. Uses Drill's strict form of schema identity: that
+ * not only must the column definitions match; the vectors must
+ * be identical from one batch to the next. If the vectors differ,
+ * then this class assumes a new schema has occurred, and will
+ * rebuild all the underlying readers, which can be costly.
*/
-public interface ResultSetReader {
+public interface PullResultSetReader {
/**
- * Start tracking a new batch in the associated
- * vector container.
+ * Advance to the next batch of data. The iterator starts
+ * positioned before the first batch (but after obtaining
+ * a schema.)
+ * @return {@code true} if another batch is available,
+ * {@code false} if EOF
*/
- void start();
+ boolean next();
/**
- * Get the row reader for this batch. The row reader is
- * guaranteed to remain the same for the life of the
- * result set reader.
- *
- * @return the row reader to read rows for the current
- * batch
+ * Return the schema for this result set.
*/
- RowSetReader reader();
+ TupleMetadata schema();
- /**
- * Detach the batch of data from this reader. Does not
- * release the memory for that batch.
- */
- void detach();
+ int schemaVersion();
/**
- * Detach the batch of data from this reader and release
- * the memory for that batch. Call this method before
- * loading the underlying vector container with more
- * data, then call {@link #start()} after new data is
- * available.
+ * Obtain a reader to iterate over the rows of the batch. The return
+ * value will likely be the same reader each time, so that this call
+ * is optional after the first batch.
*/
- void release();
+ RowSetReader reader();
/**
* Close this reader. Releases any memory still assigned
@@ -89,11 +102,4 @@ public interface ResultSetReader {
* you want to preserve the batch memory.
*/
void close();
-
- /**
- * Convenience method to access the input batch.
- * @return the batch bound to the reader at construction
- * time
- */
- BatchAccessor inputBatch();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java
new file mode 100644
index 0000000..b011cd9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet;
+
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+
+/**
+ * Push-based result set reader, in which the caller obtains batches
+ * and registers them with the implementation. The client thus is responsible
+ * for detecting the end of batches and releasing memory. General protocol:
+ * <p>
+ * <ul>
+ * <li>Create an instance and bind it to a batch source.</li>
+ * <li>Obtain a batch, typically by having it passed in.</li>
+ * <li>Call {@link #start()} to obtain a reader for that batch.</li>
+ * <li>Iterate over the rows.</li>
+ * <li>Release memory for the batch.</li>
+ * </ul>
+ * <p>
+ * In Drill,
+ * batches may have the same or different schemas. Each call to
+ * {@link #start()} prepares a {@link RowSetReader} to use for
+ * the available batch. If the batch has the same schema as the previous,
+ * then the existing reader is simply repositioned at the start of the
+ * batch. If the schema changed (or this is the first batch), then a
+ * new reader is created. Thus, the client should not assume that the
+ * same reader is available across calls. However, if it is useful to
+ * cache column writers, simply check if the reader returned from
+ * {@code start()} is the same as the previous one. If so, the column
+ * writers are also the same.
+ */
+public interface PushResultSetReader {
+ RowSetReader start();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
index 5d68daf..cc18218 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
@@ -40,12 +40,13 @@ import org.apache.drill.exec.record.VectorContainer;
* each non-schema-change batch.
*
* <h4>Protocol</h4>
+ *
* Overall lifecycle:
* <ol>
* <li>Create an instance of the
* {@link org.apache.drill.exec.physical.resultSet.impl.ResultSetCopierImpl
- * ResultSetCopierImpl} class, passing the input batch
- * accessor to the constructor.</li>
+ * ResultSetCopierImpl} class, passing the input row set reader
+ * to the constructor.</li>
* <li>Loop to process each output batch as shown below. That is, continually
* process calls to the {@link BatchIterator#next()} method.</li>
* <li>Call {@link #close()}.</li>
@@ -57,8 +58,7 @@ import org.apache.drill.exec.record.VectorContainer;
* <pre><code>
* public IterOutcome next() {
* copier.startOutputBatch();
- * while (! copier.isFull() {
- * copier.freeInput();
+ * while (!copier.isFull() {
* IterOutcome innerResult = inner.next();
* if (innerResult == DONE) { break; }
* copier.startInputBatch();
@@ -92,7 +92,6 @@ import org.apache.drill.exec.record.VectorContainer;
* Because we wish to fill the output batch, we may be able to copy
* part of a batch, the whole batch, or multiple batches to the output.
*/
-
public interface ResultSetCopier {
/**
@@ -102,9 +101,9 @@ public interface ResultSetCopier {
/**
* Start the next input batch. The input batch must be held
- * by the VectorAccessor passed into the constructor.
+ * by the {@code ResultSetReader} passed into the constructor.
*/
- void startInputBatch();
+ boolean nextInputBatch();
/**
* If copying rows one by one, copy the next row from the
@@ -135,12 +134,6 @@ public interface ResultSetCopier {
void copyAllRows();
/**
- * Release the input. Must be called (explicitly, or via
- * {@link #copyInput()} before loading another input batch.
- */
- void releaseInputBatch();
-
- /**
* Reports if the output batch has rows. Useful after the end
* of input to determine if a partial output batch exists to
* send downstream.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
new file mode 100644
index 0000000..dc803de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * <h4>Protocol</h4>
+ * <ol>
+ * <li>Create an instance, passing in a
+ * {@link UpstreamSource} to provide batches and optional
+ * selection vector.</li>
+ * <li>For each incoming batch:
+ * <ol>
+ * <li>Call {@link #start()} to attach the batch. The associated
+ * {@link BatchAccessor} reports if the schema has changed.</li>
+ * <li>Call {@link #reader()} to obtain a reader.</li>
+ * <li>Iterate over the batch using the reader.</li>
+ * <li>Call {@link #release()} to free the memory for the
+ * incoming batch. Or, to call {@link #detach()} to keep
+ * the batch memory.</li>
+ * </ol>
+ * <li>Call {@link #close()} after all batches are read.</li>
+ * </ol>
+ */
+public class PullResultSetReaderImpl implements PullResultSetReader {
+
+ public interface UpstreamSource extends PushResultSetReaderImpl.UpstreamSource {
+ boolean next();
+ void release();
+ }
+
+ @VisibleForTesting
+ protected enum State {
+ START,
+ PENDING,
+ BATCH,
+ DETACHED,
+ EOF,
+ CLOSED
+ }
+
+ private final PushResultSetReaderImpl baseReader;
+ private final UpstreamSource source;
+ private State state = State.START;
+ private RowSetReader rowSetReader;
+
+ public PullResultSetReaderImpl(UpstreamSource source) {
+ this.baseReader = new PushResultSetReaderImpl(source);
+ this.source = source;
+ }
+
+ @Override
+ public TupleMetadata schema() {
+ switch (state) {
+ case CLOSED:
+ return null;
+ case START:
+ if (!next()) {
+ return null;
+ }
+ state = State.PENDING;
+ break;
+ default:
+ }
+ return rowSetReader.tupleSchema();
+ }
+
+ @Override
+ public boolean next() {
+ switch (state) {
+ case PENDING:
+ state = State.BATCH;
+ return true;
+ case BATCH:
+ source.release();
+ break;
+ case CLOSED:
+ throw new IllegalStateException("Reader is closed");
+ case EOF:
+ return false;
+ case START:
+ break;
+ default:
+ source.release();
+ }
+ if (!source.next()) {
+ state = State.EOF;
+ return false;
+ }
+
+ rowSetReader = baseReader.start();
+ state = State.BATCH;
+ return true;
+ }
+
+ @Override
+ public int schemaVersion() { return source.schemaVersion(); }
+
+ @Override
+ public RowSetReader reader() {
+ Preconditions.checkState(state == State.BATCH, "Not in batch-ready state.");
+ return rowSetReader;
+ }
+
+ @Override
+ public void close() {
+ source.release();
+ state = State.CLOSED;
+ }
+
+ @VisibleForTesting
+ protected State state() { return state; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java
new file mode 100644
index 0000000..7fc1156
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.physical.resultSet.PushResultSetReader;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class PushResultSetReaderImpl implements PushResultSetReader {
+
+ public interface UpstreamSource {
+ int schemaVersion();
+ VectorContainer batch();
+ SelectionVector2 sv2();
+ }
+
+ public static class BatchHolder implements UpstreamSource {
+
+ private final VectorContainer container;
+ private int schemaVersion;
+
+ public BatchHolder(VectorContainer container) {
+ this.container = container;
+ }
+
+ public void newBatch() {
+ if (schemaVersion == 0) {
+ schemaVersion = 1;
+ } else if (container.isSchemaChanged()) {
+ schemaVersion++;
+ }
+ }
+
+ @Override
+ public int schemaVersion() { return schemaVersion; }
+
+ @Override
+ public VectorContainer batch() { return container; }
+
+ @Override
+ public SelectionVector2 sv2() { return null; }
+ }
+
+ private final UpstreamSource source;
+ private int priorSchemaVersion;
+ private RowSetReader rowSetReader;
+
+ public PushResultSetReaderImpl(UpstreamSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public RowSetReader start() {
+ int sourceSchemaVersion = source.schemaVersion();
+ Preconditions.checkState(sourceSchemaVersion > 0);
+ Preconditions.checkState(priorSchemaVersion <= sourceSchemaVersion);
+
+ // If new schema, discard the old reader (if any, and create
+ // a new one that matches the new schema. If not a new schema,
+ // then the old reader is reused: it points to vectors which
+ // Drill requires be the same vectors as the previous batch,
+ // but with different buffers.
+ boolean newSchema = priorSchemaVersion != sourceSchemaVersion;
+ if (newSchema) {
+ rowSetReader = createRowSet().reader();
+ priorSchemaVersion = sourceSchemaVersion;
+ } else {
+ rowSetReader.newBatch();
+ }
+ return rowSetReader;
+ }
+
+ // TODO: Build the reader without the need for a row set
+ private RowSet createRowSet() {
+ VectorContainer container = source.batch();
+ switch (container.getSchema().getSelectionVectorMode()) {
+ case FOUR_BYTE:
+ throw new IllegalArgumentException("Build from SV4 not yet supported");
+ case NONE:
+ return DirectRowSet.fromContainer(container);
+ case TWO_BYTE:
+ return IndirectRowSet.fromSv2(container, source.sv2());
+ default:
+ throw new IllegalStateException("Invalid selection mode");
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
index 9c596ff..1dd8d54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
@@ -18,14 +18,12 @@
package org.apache.drill.exec.physical.resultSet.impl;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ColumnWriter;
@@ -40,6 +38,7 @@ public class ResultSetCopierImpl implements ResultSetCopier {
BATCH_ACTIVE,
NEW_SCHEMA,
SCHEMA_PENDING,
+ END_OF_INPUT,
CLOSED
}
@@ -69,7 +68,7 @@ public class ResultSetCopierImpl implements ResultSetCopier {
// Input state
private int currentSchemaVersion = -1;
- private final ResultSetReader resultSetReader;
+ private final PullResultSetReader resultSetReader;
protected RowSetReader rowReader;
// Output state
@@ -85,14 +84,14 @@ public class ResultSetCopierImpl implements ResultSetCopier {
private CopyPair[] projection;
private CopyAll activeCopy;
- public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch) {
- this(allocator, inputBatch, new ResultSetOptionBuilder());
+ public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source) {
+ this(allocator, source, new ResultSetOptionBuilder());
}
- public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch,
+ public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source,
ResultSetOptionBuilder outputOptions) {
this.allocator = allocator;
- resultSetReader = new ResultSetReaderImpl(inputBatch);
+ resultSetReader = source;
writerOptions = outputOptions;
writerOptions.vectorCache(new ResultVectorCacheImpl(allocator));
state = State.START;
@@ -104,7 +103,6 @@ public class ResultSetCopierImpl implements ResultSetCopier {
// No schema yet. Defer real batch start until we see an input
// batch.
-
state = State.NO_SCHEMA;
return;
}
@@ -112,7 +110,6 @@ public class ResultSetCopierImpl implements ResultSetCopier {
if (state == State.SCHEMA_PENDING) {
// We have a pending new schema. Create new writers to match.
-
createProjection();
}
resultSetWriter.startBatch();
@@ -120,67 +117,58 @@ public class ResultSetCopierImpl implements ResultSetCopier {
if (isCopyPending()) {
// Resume copying if a copy is active.
-
copyBlock();
}
}
@Override
- public void startInputBatch() {
+ public boolean nextInputBatch() {
+ if (state == State.END_OF_INPUT) {
+ return false;
+ }
Preconditions.checkState(state == State.NO_SCHEMA || state == State.NEW_SCHEMA ||
state == State.BATCH_ACTIVE,
"Can only start input while in an output batch");
Preconditions.checkState(!isCopyPending(),
"Finish the pending copy before changing input");
- bindInput();
+ if (!resultSetReader.next()) {
+ state = State.END_OF_INPUT;
+ return false;
+ }
+ rowReader = resultSetReader.reader();
if (state == State.BATCH_ACTIVE) {
// If no schema change, we are ready to copy.
-
- if (currentSchemaVersion == resultSetReader.inputBatch().schemaVersion()) {
- return;
+ if (currentSchemaVersion == resultSetReader.schemaVersion()) {
+ return true;
}
// The schema has changed. Handle it now or later.
-
if (hasOutputRows()) {
// Output batch has rows. Can't switch and bind inputs
// until current batch is sent downstream.
-
state = State.NEW_SCHEMA;
- return;
+ return true;
}
}
// The schema changed: first schema, or a change while a bath
// is active, but is empty.
-
if (state == State.NO_SCHEMA) {
state = State.BATCH_ACTIVE;
} else {
// Discard the unused empty batch
-
harvest().zeroVectors();
}
createProjection();
resultSetWriter.startBatch();
// Stay in the current state.
- }
-
- protected void bindInput() {
- resultSetReader.start();
- rowReader = resultSetReader.reader();
- }
-
- @Override
- public void releaseInputBatch() {
- Preconditions.checkState(state != State.CLOSED);
- resultSetReader.release();
+ return true;
}
private void createProjection() {
@@ -190,14 +178,13 @@ public class ResultSetCopierImpl implements ResultSetCopier {
// will tear down the whole show. But, the vector cache will
// ensure that the new writer reuses any matching vectors from
// the prior batch to provide vector persistence as Drill expects.
-
resultSetWriter.close();
}
- TupleMetadata schema = MetadataUtils.fromFields(resultSetReader.inputBatch().schema());
+ TupleMetadata schema = resultSetReader.schema();
writerOptions.readerSchema(schema);
resultSetWriter = new ResultSetLoaderImpl(allocator, writerOptions.build());
rowWriter = resultSetWriter.writer();
- currentSchemaVersion = resultSetReader.inputBatch().schemaVersion();
+ currentSchemaVersion = resultSetReader.schemaVersion();
int colCount = schema.size();
projection = new CopyPair[colCount];
@@ -225,6 +212,7 @@ public class ResultSetCopierImpl implements ResultSetCopier {
case BATCH_ACTIVE:
return rowWriter.isFull();
case NEW_SCHEMA:
+ case END_OF_INPUT:
return true;
default:
return false;
@@ -288,7 +276,8 @@ public class ResultSetCopierImpl implements ResultSetCopier {
@Override
public VectorContainer harvest() {
- Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA);
+ Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA ||
+ state == State.END_OF_INPUT);
VectorContainer output = resultSetWriter.harvest();
state = (state == State.BATCH_ACTIVE)
? State.BETWEEN_BATCHES : State.SCHEMA_PENDING;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
deleted file mode 100644
index 6046c97..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.resultSet.impl;
-
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
-import org.apache.drill.exec.physical.rowSet.RowSets;
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-public class ResultSetReaderImpl implements ResultSetReader {
-
- @VisibleForTesting
- protected enum State {
- START,
- BATCH,
- DETACHED,
- CLOSED
- }
-
- private State state = State.START;
- private int priorSchemaVersion;
- private final BatchAccessor batch;
- private RowSetReader rowSetReader;
-
- public ResultSetReaderImpl(BatchAccessor batch) {
- this.batch = batch;
- }
-
- @Override
- public void start() {
- Preconditions.checkState(state != State.CLOSED, "Reader is closed");
- Preconditions.checkState(state != State.BATCH,
- "Call detach/release before starting another batch");
- Preconditions.checkState(state == State.START ||
- priorSchemaVersion <= batch.schemaVersion());
- boolean newSchema = state == State.START ||
- priorSchemaVersion != batch.schemaVersion();
- state = State.BATCH;
-
- // If new schema, discard the old reader (if any, and create
- // a new one that matches the new schema. If not a new schema,
- // then the old reader is reused: it points to vectors which
- // Drill requires be the same vectors as the previous batch,
- // but with different buffers.
-
- if (newSchema) {
- rowSetReader = RowSets.wrap(batch).reader();
- priorSchemaVersion = batch.schemaVersion();
- } else {
- rowSetReader.newBatch();
- }
- }
-
- @Override
- public RowSetReader reader() {
- Preconditions.checkState(state == State.BATCH, "Call start() before requesting the reader.");
- return rowSetReader;
- }
-
- @Override
- public void detach() {
- if (state != State.START) {
- Preconditions.checkState(state == State.BATCH || state == State.DETACHED);
- state = State.DETACHED;
- }
- }
-
- @Override
- public void release() {
- if (state != State.START && state != State.DETACHED) {
- detach();
- batch.release();
- }
- }
-
- @Override
- public void close() {
- if (state != State.CLOSED) {
- release();
- state = State.CLOSED;
- }
- }
-
- @VisibleForTesting
- protected State state() { return state; }
-
- @Override
- public BatchAccessor inputBatch() { return batch; }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
index 13780de..8404dc4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
@@ -24,12 +24,11 @@ import static org.junit.Assert.fail;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.impl.protocol.IndirectContainerAccessor;
-import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -39,6 +38,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector2Builder;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
@@ -55,77 +55,95 @@ public class TestResultSetCopier extends SubOperatorTest {
.add("name", MinorType.VARCHAR)
.build();
- private static class BaseDataGen {
- protected final TupleMetadata schema;
+ private static abstract class BaseDataGen implements UpstreamSource {
+ protected int schemaVersion = 1;
protected final ResultSetLoader rsLoader;
- protected final VectorContainerAccessor batch = new VectorContainerAccessor();
+ protected VectorContainer batch;
+ protected int batchCount;
+ protected int rowCount;
+ protected int batchSize;
+ protected int batchLimit;
- public BaseDataGen(TupleMetadata schema) {
- this.schema = schema;
+ public BaseDataGen(TupleMetadata schema, int batchSize, int batchLimit) {
ResultSetOptions options = new ResultSetOptionBuilder()
.readerSchema(schema)
.vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
.build();
rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ this.batchSize = batchSize;
+ this.batchLimit = batchLimit;
}
- public TupleMetadata schema() { return schema; }
+ @Override
+ public int schemaVersion() { return schemaVersion; }
- public BatchAccessor batchAccessor() {
- return batch;
+ @Override
+ public VectorContainer batch() { return batch; }
+
+ @Override
+ public boolean next() {
+ if (batchCount >= batchLimit) {
+ return false;
+ }
+ makeBatch();
+ return true;
+ }
+
+ protected abstract void makeBatch();
+
+ @Override
+ public SelectionVector2 sv2() { return null; }
+
+ @Override
+ public void release() {
+ if (batch != null) {
+ batch.zeroVectors();
+ }
+ SelectionVector2 sv2 = sv2();
+ if (sv2 != null) {
+ sv2.clear();
+ }
}
}
private static class DataGen extends BaseDataGen {
public DataGen() {
- super(TEST_SCHEMA);
+ this(3, 1);
+ }
+
+ public DataGen(int batchSize, int batchLimit) {
+ super(TEST_SCHEMA, batchSize, batchLimit);
}
- public void makeBatch(int start, int end) {
+ @Override
+ protected void makeBatch() {
rsLoader.startBatch();
- for (int i = start; i <= end; i++) {
- rsLoader.writer().addRow(i, "Row " + i);
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
+ rsLoader.writer().addRow(rowCount, "Row " + rowCount);
}
- batch.addBatch(rsLoader.harvest());
+ batch = rsLoader.harvest();
+ batchCount++;
}
}
- public static class DataGen2 extends DataGen {
- private final int batchCount = 2;
- private final int batchSize = 5;
- private int batchIndex;
+ public static class SchemaChangeGen extends DataGen {
- boolean next() {
- if (batchIndex >= batchCount) {
- return false;
- }
- int start = nextRow();
- makeBatch(start, start + batchSize - 1);
- batchIndex++;
- return true;
- }
+ int schema1Limit;
- int nextRow() {
- return batchIndex * batchSize + 1;
+ public SchemaChangeGen(int batchSize, int batchLimit, int schema1Limit) {
+ super(batchSize, batchLimit);
+ this.schema1Limit = schema1Limit;
}
- int targetRowCount( ) {
- return batchCount * batchSize;
+ public SchemaChangeGen(int schema1Limit) {
+ super(3, 3);
+ this.schema1Limit = schema1Limit;
}
- }
- public static class SchemaChangeGen extends DataGen {
- private int batchIndex;
- public final int batchSize = 5;
- private int schemaVersion = 1;
-
- public void makeBatch2(int start, int end) {
- rsLoader.startBatch();
- for (int i = start; i <= end; i++) {
- rsLoader.writer().addRow(i, "Row " + i, i * 10);
- }
- batch.addBatch(rsLoader.harvest());
+ public SchemaChangeGen() {
+ this(2);
}
public TupleMetadata schema2() {
@@ -136,20 +154,31 @@ public class TestResultSetCopier extends SubOperatorTest {
.build();
}
- public void evolveSchema() {
- rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED));
- schemaVersion = 2;
+ @Override
+ protected void makeBatch() {
+ if (batchCount < schema1Limit) {
+ super.makeBatch();
+ } else if (batchCount == schema1Limit) {
+ evolveSchema();
+ makeBatch2();
+ } else {
+ makeBatch2();
+ }
}
- public void nextBatch() {
- int start = batchIndex * batchSize + 1;
- int end = start + batchSize - 1;
- if (schemaVersion == 1) {
- makeBatch(start, end);
- } else {
- makeBatch2(start, end);
+ public void makeBatch2() {
+ rsLoader.startBatch();
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
+ rsLoader.writer().addRow(rowCount, "Row " + rowCount, rowCount * 10);
}
- batchIndex++;
+ batch = rsLoader.harvest();
+ batchCount++;
+ }
+
+ public void evolveSchema() {
+ rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED));
+ schemaVersion = 2;
}
}
@@ -160,24 +189,28 @@ public class TestResultSetCopier extends SubOperatorTest {
.add("id", MinorType.INT)
.addNullable("name", MinorType.VARCHAR)
.addNullable("amount", MinorType.INT)
- .build());
+ .build(),
+ 10, 1);
}
- public void makeBatch(int start, int end) {
+ @Override
+ protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
- for (int i = start; i <= end; i++) {
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
writer.start();
- writer.scalar(0).setInt(i);
+ writer.scalar(0).setInt(rowCount);
if (i % 2 == 0) {
- writer.scalar(1).setString("Row " + i);
+ writer.scalar(1).setString("Row " + rowCount);
}
if (i % 3 == 0) {
- writer.scalar(2).setInt(i * 10);
+ writer.scalar(2).setInt(rowCount * 10);
}
writer.save();
}
- batch.addBatch(rsLoader.harvest());
+ batch = rsLoader.harvest();
+ batchCount++;
}
}
@@ -187,23 +220,27 @@ public class TestResultSetCopier extends SubOperatorTest {
super(new SchemaBuilder()
.add("id", MinorType.INT)
.addArray("name", MinorType.VARCHAR)
- .build());
+ .build(),
+ 3, 1);
}
- public void makeBatch(int start, int end) {
+ @Override
+ protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
ArrayWriter aw = writer.array(1);
- for (int i = start; i <= end; i++) {
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
writer.start();
- writer.scalar(0).setInt(i);
+ writer.scalar(0).setInt(rowCount);
int n = i % 3;
for (int j = 0; j < n; j++) {
- aw.scalar().setString("Row " + i + "." + j);
+ aw.scalar().setString("Row " + rowCount + "." + j);
}
writer.save();
}
- batch.addBatch(rsLoader.harvest());
+ batch = rsLoader.harvest();
+ batchCount++;
}
}
@@ -216,38 +253,80 @@ public class TestResultSetCopier extends SubOperatorTest {
.add("name", MinorType.VARCHAR)
.add("amount", MinorType.INT)
.resumeSchema()
- .build());
+ .build(),
+ 3, 1);
}
- public void makeBatch(int start, int end) {
+ @Override
+ protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
ArrayWriter aw = writer.array(1);
TupleWriter mw = aw.entry().tuple();
- for (int i = start; i <= end; i++) {
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
writer.start();
- writer.scalar(0).setInt(i);
+ writer.scalar(0).setInt(rowCount);
int n = i % 3;
for (int j = 0; j < n; j++) {
- mw.scalar(0).setString("Row " + i + "." + j);
- mw.scalar(1).setInt(i * 100 + j);
+ mw.scalar(0).setString("Row " + rowCount + "." + j);
+ mw.scalar(1).setInt(rowCount * 100 + j);
aw.save();
}
writer.save();
}
- batch.addBatch(rsLoader.harvest());
+ batch = rsLoader.harvest();
+ batchCount++;
}
}
+ public static class FilteredGen extends DataGen {
+
+ SelectionVector2 sv2;
+
+ public FilteredGen() {
+ super(10, 1);
+ }
+
+ @Override
+ protected void makeBatch() {
+ super.makeBatch();
+ makeSv2();
+ }
+
+ // Pick out every other record, in descending
+ // order.
+ private void makeSv2() {
+ SelectionVector2Builder sv2Builder =
+ new SelectionVector2Builder(fixture.allocator(), batch.getRecordCount());
+ for (int i = 0; i < 5; i++) {
+ sv2Builder.setNext(10 - 2 * i - 1);
+ }
+ sv2 = sv2Builder.harvest(batch);
+ batch.buildSchema(SelectionVectorMode.TWO_BYTE);
+ }
+
+ @Override
+ public SelectionVector2 sv2() { return sv2; }
+ }
+
+ private ResultSetCopierImpl newCopier(UpstreamSource source) {
+ PullResultSetReader reader = new PullResultSetReaderImpl(source);
+ return new ResultSetCopierImpl(fixture.allocator(), reader);
+ }
+
+ private ResultSetCopierImpl newCopier(UpstreamSource source, ResultSetOptionBuilder outputOptions) {
+ PullResultSetReader reader = new PullResultSetReaderImpl(source);
+ return new ResultSetCopierImpl(fixture.allocator(), reader, outputOptions);
+ }
+
@Test
public void testBasics() {
DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(dataGen);
// Nothing should work yet
-
try {
copier.copyAllRows();
fail();
@@ -262,28 +341,23 @@ public class TestResultSetCopier extends SubOperatorTest {
}
// Predicates should work
-
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Define a schema and start an output batch.
-
copier.startOutputBatch();
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
- // Provide an input row
-
- dataGen.makeBatch(1, 3);
- copier.startInputBatch();
+ // Provide an input batch
+ assertTrue(copier.nextInputBatch());
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Now can do some actual copying
-
while (copier.copyNextRow()) {
// empty
}
@@ -294,72 +368,62 @@ public class TestResultSetCopier extends SubOperatorTest {
// Get and verify the output batch
// (Does not free the input batch, we reuse it
// in the verify step below.)
-
RowSet result = fixture.wrap(copier.harvest());
- new RowSetComparison(fixture.wrap(dataGen.batchAccessor().container()))
+ new RowSetComparison(fixture.wrap(dataGen.batch()))
.verifyAndClear(result);
- // Copier will release the input batch
+ // No more input
+ copier.startOutputBatch();
+ assertFalse(copier.nextInputBatch());
+ // OK to try multiple times
+ assertFalse(copier.nextInputBatch());
+
+ // Copier will release the input batch
copier.close();
}
@Test
public void testImmediateClose() {
- DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new DataGen());
// Close OK before things got started
-
copier.close();
// Second close is benign
-
copier.close();
}
@Test
public void testCloseBeforeSchema() {
- DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new DataGen());
// Start batch, no data yet.
-
copier.startOutputBatch();
// Close OK before things data arrives
-
copier.close();
// Second close is benign
-
copier.close();
}
@Test
public void testCloseWithData() {
- DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new DataGen());
// Start batch, with data.
-
copier.startOutputBatch();
- dataGen.makeBatch(1, 3);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyNextRow();
// Close OK with input and output batch allocated.
-
copier.close();
// Second close is benign
-
copier.close();
}
@@ -371,27 +435,25 @@ public class TestResultSetCopier extends SubOperatorTest {
* This copier does not support merging from multiple
* streams.
*/
-
@Test
public void testMerge() {
- DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new DataGen(3, 5));
copier.startOutputBatch();
for (int i = 0; i < 5; i++) {
- int start = i * 3 + 1;
- dataGen.makeBatch(start, start + 2);
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
assertFalse(copier.isOutputFull());
copier.copyAllRows();
- copier.releaseInputBatch();
assertFalse(copier.isOutputFull());
assertFalse(copier.isCopyPending());
}
+ assertFalse(copier.nextInputBatch());
RowSet result = fixture.wrap(copier.harvest());
- dataGen.makeBatch(1, 15);
- RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+
+ // Verify with single batch with all rows
+ DataGen dataGen = new DataGen(15, 1);
+ dataGen.next();
+ RowSet expected = RowSets.wrap(dataGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
@@ -399,81 +461,67 @@ public class TestResultSetCopier extends SubOperatorTest {
@Test
public void testMultiOutput() {
- DataGen2 dataGen = new DataGen2();
- DataGen validatorGen = new DataGen();
// Equivalent of operator start() method.
-
+ DataGen dataGen = new DataGen(15, 2);
ResultSetOptionBuilder options = new ResultSetOptionBuilder()
.rowCountLimit(12);
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor(), options);
+ ResultSetCopier copier = newCopier(dataGen, options);
// Equivalent of an entire operator run
-
- int start = 1;
+ DataGen validatorGen = new DataGen(12, 2);
+ int outputCount = 0;
while (true) {
// Equivalent of operator next() method
-
copier.startOutputBatch();
while (! copier.isOutputFull()) {
- copier.releaseInputBatch();
- if (! dataGen.next()) {
+ if (!copier.nextInputBatch()) {
break;
}
- copier.startInputBatch();
copier.copyAllRows();
}
- if (! copier.hasOutputRows()) {
+ if (!copier.hasOutputRows()) {
break;
}
// Equivalent of sending downstream
-
RowSet result = fixture.wrap(copier.harvest());
- int nextRow = dataGen.nextRow();
- validatorGen.makeBatch(start, nextRow - 1);
- RowSet expected = RowSets.wrap(validatorGen.batchAccessor());
- RowSetUtilities.verify(expected, result);
- start = nextRow;
+
+ validatorGen.next();
+ RowSet expected = RowSets.wrap(validatorGen.batch());
+ RowSetUtilities.verify(expected, result, result.rowCount());
+ outputCount++;
}
// Ensure more than one output batch.
-
- assertTrue(start > 1);
+ assertTrue(outputCount > 1);
// Ensure all rows generated.
-
- assertEquals(dataGen.targetRowCount(), start - 1);
+ assertEquals(30, dataGen.rowCount);
// Simulate operator close();
-
copier.close();
}
@Test
public void testCopyRecord() {
- DataGen dataGen = new DataGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new DataGen(3, 2));
copier.startOutputBatch();
- dataGen.makeBatch(1, 3);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyRow(2);
copier.copyRow(0);
copier.copyRow(1);
- copier.releaseInputBatch();
- dataGen.makeBatch(4, 6);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyRow(1);
copier.copyRow(0);
copier.copyRow(2);
- copier.releaseInputBatch();
- RowSet expected = new RowSetBuilder(fixture.allocator(), dataGen.schema())
+ assertFalse(copier.nextInputBatch());
+
+ RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
.addRow(3, "Row 3")
.addRow(1, "Row 1")
.addRow(2, "Row 2")
@@ -481,67 +529,49 @@ public class TestResultSetCopier extends SubOperatorTest {
.addRow(4, "Row 4")
.addRow(6, "Row 6")
.build();
- RowSet result = fixture.wrap(copier.harvest());
- RowSetUtilities.verify(expected, result);
+ RowSetUtilities.verify(expected, fixture.wrap(copier.harvest()));
copier.close();
}
@Test
public void testSchemaChange() {
- SchemaChangeGen dataGen = new SchemaChangeGen();
- SchemaChangeGen verifierGen = new SchemaChangeGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new SchemaChangeGen(3, 4, 2));
// Copy first batch with first schema
-
copier.startOutputBatch();
- dataGen.nextBatch();
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
// Second, same schema
-
- copier.releaseInputBatch();
- dataGen.nextBatch();
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
// Plenty of room. But, change the schema.
-
- copier.releaseInputBatch();
- dataGen.evolveSchema();
- dataGen.nextBatch();
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
assertTrue(copier.isOutputFull());
// Must harvest partial output
-
RowSet result = fixture.wrap(copier.harvest());
- verifierGen.makeBatch(1, 2 * dataGen.batchSize - 1);
- RowSet expected = RowSets.wrap(verifierGen.batchAccessor());
+ SchemaChangeGen verifierGen = new SchemaChangeGen(6, 2, 1);
+ verifierGen.next();
+ RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
// Start a new batch, implicitly complete pending copy
-
copier.startOutputBatch();
copier.copyAllRows();
// Add one more of second schema
-
- copier.releaseInputBatch();
- dataGen.nextBatch();
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
result = fixture.wrap(copier.harvest());
- verifierGen.evolveSchema();
- verifierGen.makeBatch2(2 * dataGen.batchSize + 1, 4 * dataGen.batchSize - 1);
- expected = RowSets.wrap(verifierGen.batchAccessor());
+ verifierGen.next();
+ expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
assertFalse(copier.isCopyPending());
@@ -553,31 +583,11 @@ public class TestResultSetCopier extends SubOperatorTest {
@Test
public void testSV2() {
- DataGen dataGen = new DataGen();
- IndirectContainerAccessor filtered = new IndirectContainerAccessor();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), filtered);
+ ResultSetCopier copier = newCopier(new FilteredGen());
copier.startOutputBatch();
- dataGen.makeBatch(1, 10);
-
- // Pick out every other record, in descending
- // order.
-
- VectorContainer container = dataGen.batchAccessor().container();
- SelectionVector2Builder sv2Builder =
- new SelectionVector2Builder(fixture.allocator(), container.getRecordCount());
- for (int i = 0; i < 5; i++) {
- sv2Builder.setNext(10 - 2 * i - 1);
- }
- container.buildSchema(SelectionVectorMode.TWO_BYTE);
- filtered.addBatch(container);
- filtered.setSelectionVector(sv2Builder.harvest(container));
- assertEquals(5, filtered.rowCount());
-
- copier.startInputBatch();
+ assertTrue(copier.nextInputBatch());
copier.copyAllRows();
- copier.releaseInputBatch();
RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
.addRow(10, "Row 10")
@@ -599,17 +609,16 @@ public class TestResultSetCopier extends SubOperatorTest {
@Test
public void testNullable() {
- NullableGen dataGen = new NullableGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new NullableGen());
copier.startOutputBatch();
- dataGen.makeBatch(1, 10);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
- RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+ NullableGen verifierGen = new NullableGen();
+ verifierGen.next();
+ RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
@@ -617,17 +626,16 @@ public class TestResultSetCopier extends SubOperatorTest {
@Test
public void testArrays() {
- ArrayGen dataGen = new ArrayGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new ArrayGen());
copier.startOutputBatch();
- dataGen.makeBatch(1, 5);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
- RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+ ArrayGen verifierGen = new ArrayGen();
+ verifierGen.next();
+ RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
@@ -635,17 +643,16 @@ public class TestResultSetCopier extends SubOperatorTest {
@Test
public void testMaps() {
- MapGen dataGen = new MapGen();
- ResultSetCopier copier = new ResultSetCopierImpl(
- fixture.allocator(), dataGen.batchAccessor());
+ ResultSetCopier copier = newCopier(new MapGen());
copier.startOutputBatch();
- dataGen.makeBatch(1, 5);
- copier.startInputBatch();
+ copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
- RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+ MapGen verifierGen = new MapGen();
+ verifierGen.next();
+ RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
index 05c5430..08ad5ac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
@@ -18,98 +18,140 @@
package org.apache.drill.exec.physical.resultSet.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
public class TestResultSetReader extends SubOperatorTest {
- public static class BatchGenerator {
+ private static final TupleMetadata SCHEMA1 = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .add("name", MinorType.VARCHAR)
+ .build();
+ private static final TupleMetadata SCHEMA2 = new SchemaBuilder()
+ .addAll(SCHEMA1)
+ .add("amount", MinorType.INT)
+ .build();
+
+ public static class BatchGenerator implements UpstreamSource {
private enum State { SCHEMA1, SCHEMA2 };
private final ResultSetLoader rsLoader;
- private final VectorContainerAccessor batch = new VectorContainerAccessor();
+ private VectorContainer batch;
+ private int schemaVersion;
private State state;
+ private int batchCount;
+ private int rowCount;
+ private final int schema1Count;
+ private final int schema2Count;
+ private final int batchSize;
- public BatchGenerator() {
- TupleMetadata schema1 = new SchemaBuilder()
- .add("id", MinorType.INT)
- .add("name", MinorType.VARCHAR)
- .build();
+ public BatchGenerator(int batchSize, int schema1Count, int schema2Count) {
ResultSetOptions options = new ResultSetOptionBuilder()
- .readerSchema(schema1)
+ .readerSchema(SCHEMA1)
.vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
.build();
- rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
- state = State.SCHEMA1;
+ this.rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ this.state = State.SCHEMA1;
+ this.batchSize = batchSize;
+ this.schemaVersion = 1;
+ this.schema1Count = schema1Count;
+ this.schema2Count = schema2Count;
}
- public void batch1(int start, int end) {
+ public void batch1() {
Preconditions.checkState(state == State.SCHEMA1);
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
- for (int i = start; i <= end; i++) {
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
writer.start();
- writer.scalar("id").setInt(i);
- writer.scalar("name").setString("Row" + i);
+ writer.scalar("id").setInt(rowCount);
+ writer.scalar("name").setString("Row" + rowCount);
writer.save();
}
- batch.addBatch(rsLoader.harvest());
+ batch = rsLoader.harvest();
+ batchCount++;
}
- public void batch2(int start, int end) {
+ public void batch2() {
RowSetLoader writer = rsLoader.writer();
if (state == State.SCHEMA1) {
- ColumnMetadata balCol = MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED);
- writer.addColumn(balCol);
+ writer.addColumn(SCHEMA2.metadata("amount"));
state = State.SCHEMA2;
+ schemaVersion++;
}
rsLoader.startBatch();
- for (int i = start; i <= end; i++) {
+ for (int i = 0; i < batchSize; i++) {
+ rowCount++;
writer.start();
- writer.scalar("id").setInt(i);
- writer.scalar("name").setString("Row" + i);
- writer.scalar("amount").setInt(i * 10);
+ writer.scalar("id").setInt(rowCount);
+ writer.scalar("name").setString("Row" + rowCount);
+ writer.scalar("amount").setInt(rowCount * 10);
writer.save();
}
- batch.addBatch(rsLoader.harvest());
- }
-
- public BatchAccessor batchAccessor() {
- return batch;
+ batch = rsLoader.harvest();
+ batchCount++;
}
public void close() {
rsLoader.close();
}
+
+ @Override
+ public boolean next() {
+ if (batchCount == schema1Count + schema2Count) {
+ return false;
+ }
+ if (batchCount < schema1Count) {
+ batch1();
+ } else {
+ batch2();
+ }
+ return true;
+ }
+
+ @Override
+ public int schemaVersion() { return schemaVersion; }
+
+ @Override
+ public VectorContainer batch() { return batch; }
+
+ @Override
+ public SelectionVector2 sv2() { return null; }
+
+ @Override
+ public void release() {
+ if (batch != null) {
+ batch.zeroVectors();
+ }
+ }
}
@Test
public void testBasics() {
- BatchGenerator gen = new BatchGenerator();
- ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+ PullResultSetReader rsReader = new PullResultSetReaderImpl(
+ new BatchGenerator(10, 2, 1));
// Start state
-
try {
rsReader.reader();
fail();
@@ -117,16 +159,15 @@ public class TestResultSetReader extends SubOperatorTest {
// Expected
}
- // OK to detach with no input
- rsReader.detach();
- rsReader.release();
+ // Ask for schema. Does an implicit next.
+ assertEquals(SCHEMA1, rsReader.schema());
+ assertEquals(1, rsReader.schemaVersion());
- // Make a batch. Verify reader is attached.
+ // Move to the first batch.
// (Don't need to do a full reader test, that is already done
// elsewhere.)
-
- gen.batch1(1, 10);
- rsReader.start();
+ assertTrue(rsReader.next());
+ assertEquals(1, rsReader.schemaVersion());
RowSetReader reader1;
{
RowSetReader reader = rsReader.reader();
@@ -135,18 +176,10 @@ public class TestResultSetReader extends SubOperatorTest {
assertEquals(1, reader.scalar("id").getInt());
assertEquals("Row1", reader.scalar("name").getString());
}
- rsReader.release();
- try {
- rsReader.reader();
- fail();
- } catch (IllegalStateException e) {
- // Expected
- }
-
- // Another batch of same schema
- gen.batch1(11, 20);
- rsReader.start();
+ // Second batch, same schema.
+ assertTrue(rsReader.next());
+ assertEquals(1, rsReader.schemaVersion());
{
RowSetReader reader = rsReader.reader();
assertSame(reader1, reader);
@@ -155,12 +188,10 @@ public class TestResultSetReader extends SubOperatorTest {
assertEquals(11, reader.scalar("id").getInt());
assertEquals("Row11", reader.scalar("name").getString());
}
- rsReader.release();
// Batch with new schema
-
- gen.batch2(21, 30);
- rsReader.start();
+ assertTrue(rsReader.next());
+ assertEquals(2, rsReader.schemaVersion());
{
RowSetReader reader = rsReader.reader();
assertNotSame(reader1, reader);
@@ -170,23 +201,50 @@ public class TestResultSetReader extends SubOperatorTest {
assertEquals("Row21", reader.scalar("name").getString());
assertEquals(210, reader.scalar("amount").getInt());
}
- rsReader.release();
+ assertFalse(rsReader.next());
rsReader.close();
}
@Test
public void testCloseAtStart() {
- BatchGenerator gen = new BatchGenerator();
- ResultSetReaderImpl rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+ PullResultSetReader rsReader = new PullResultSetReaderImpl(
+ new BatchGenerator(10, 2, 1));
// Close OK in start state
+ rsReader.close();
+ // Second close OK
+ rsReader.close();
+ }
+
+ @Test
+ public void testCloseDuringRead() {
+ PullResultSetReader rsReader = new PullResultSetReaderImpl(
+ new BatchGenerator(10, 2, 1));
+
+ // Move to first batch
+ assertTrue(rsReader.next());
+
+ // Close OK in start state
rsReader.close();
- assertEquals(ResultSetReaderImpl.State.CLOSED, rsReader.state());
// Second close OK
+ rsReader.close();
+ }
+
+ @Test
+ public void testCloseAfterNext() {
+ PullResultSetReader rsReader = new PullResultSetReaderImpl(
+ new BatchGenerator(10, 2, 1));
+ // Move to first batch
+ assertTrue(rsReader.next());
+
+ // Close OK in start state
+ rsReader.close();
+
+ // Second close OK
rsReader.close();
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index db1ab11..0458aee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -321,8 +320,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
RowSet result = client.queryBuilder().sql(sql).rowSet();
assertEquals(4, result.rowCount());
result.clear();
- } catch (RpcException e) {
- assertTrue(e.getCause() instanceof UserRemoteException);
+ } catch (UserRemoteException e) {
sawError = true;
break;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
index 6acdc9c..c7ddd9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
@@ -29,8 +29,8 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.test.QueryRowSetIterator;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.junit.BeforeClass;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
index 74a8e65..7618456 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Drill query event listener that buffers rows into a producer-consumer
@@ -36,13 +38,10 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
* Query messages are transformed into events: query ID, batch,
* EOF or error.
*/
+public class BufferingQueryEventListener implements UserResultsListener {
+ private static final Logger logger = LoggerFactory.getLogger(BufferingQueryEventListener.class);
-public class BufferingQueryEventListener implements UserResultsListener
-{
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferingQueryEventListener.class);
-
- public static class QueryEvent
- {
+ public static class QueryEvent {
public enum Type { QUERY_ID, BATCH, EOF, ERROR }
public final Type type;
@@ -72,7 +71,7 @@ public class BufferingQueryEventListener implements UserResultsListener
}
}
- private BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
+ private final BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
@Override
public void queryIdArrived(QueryId queryId) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
index df3a217..c898e0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
@@ -18,8 +18,9 @@
package org.apache.drill.test;
import ch.qos.logback.classic.Level;
-import org.apache.drill.exec.client.LoggingResultsListener;
+
import org.apache.drill.common.util.function.CheckedSupplier;
+import org.apache.drill.exec.client.LoggingResultsListener;
import org.apache.drill.exec.util.VectorUtil;
import java.util.function.Supplier;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
index 8577448..e4dcd98 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -39,6 +39,8 @@ import javax.json.JsonValue;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Parses a query profile and provides access to various bits of the profile
@@ -46,7 +48,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
*/
public class ProfileParser {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileParser.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProfileParser.class);
/**
* The original JSON profile.
@@ -394,7 +396,7 @@ public class ProfileParser {
logger.info("Can't find operator def: {}-{}", major.id, op.opId);
continue;
}
- op.opName = CoreOperatorType.valueOf(op.type).name();
+ op.opName = CoreOperatorType.forNumber(op.type).name();
op.opName = op.opName.replace("_", " ");
op.name = opDef.name;
if (op.name.equalsIgnoreCase(op.opName)) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
new file mode 100644
index 0000000..2bace96
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+
+/**
+ * Iterator over batches returned from a query. Uses a listener to obtain
+ * the serialized batches, then decodes them into value vectors held
+ * in a vector container - the same structure as used in the query
+ * executor. This format allows us to use the "row set" classes on the
+ * output of the query.
+ */
+public class QueryBatchIterator implements UpstreamSource, AutoCloseable {
+
+ private enum State { START, RUN, RETAIN, EOF }
+
+ private final BufferingQueryEventListener listener;
+ private final RecordBatchLoader loader;
+ private State state = State.START;
+ private boolean retainData;
+ private QueryId queryId;
+ private QueryState queryState;
+ private int schemaVersion;
+ private int recordCount;
+ private int batchCount;
+
+ public QueryBatchIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
+ this.listener = listener;
+ this.loader = new RecordBatchLoader(allocator);
+ }
+
+ public QueryId queryId() { return queryId; }
+ public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
+ public QueryState finalState() { return queryState; }
+ public int batchCount() { return batchCount; }
+ public int rowCount() { return recordCount; }
+
+ @Override
+ public boolean next() {
+ retainData = false;
+ if (state == State.EOF) {
+ return false;
+ }
+ while (true) {
+ QueryEvent event = listener.get();
+ queryState = event.state;
+ switch (event.type) {
+ case BATCH:
+
+ // Skip over null batches
+ if (loadBatch(event)) {
+ return true;
+ }
+ break;
+ case EOF:
+ state = State.EOF;
+ return false;
+ case ERROR:
+ state = State.EOF;
+ if (event.error instanceof UserException) {
+ throw (UserException) event.error;
+ } else {
+ throw new RuntimeException(event.error);
+ }
+ case QUERY_ID:
+ queryId = event.queryId;
+ break;
+ default:
+ throw new IllegalStateException("Unexpected event: " + event.type);
+ }
+ }
+ }
+
+ private boolean loadBatch(QueryEvent event) {
+ batchCount++;
+ recordCount += event.batch.getHeader().getRowCount();
+ QueryDataBatch inputBatch = Preconditions.checkNotNull(event.batch);
+
+ // Unload the batch and convert to a row set.
+ loader.load(inputBatch.getHeader().getDef(), inputBatch.getData());
+ inputBatch.release();
+ VectorContainer batch = loader.getContainer();
+ batch.setRecordCount(loader.getRecordCount());
+
+ // Null results? Drill will return a single batch with no rows
+ // and no columns even if the scan (or other) operator returns
+ // no batches at all. For ease of testing, simply map this null
+ // result set to a null output row set that says "nothing at all
+ // was returned." Note that this is different than an empty result
+ // set which has a schema, but no rows.
+ if (batch.getRecordCount() == 0 && batch.getNumberOfColumns() == 0) {
+ release();
+ return false;
+ }
+
+ if (state == State.START || batch.isSchemaChanged()) {
+ schemaVersion++;
+ }
+ state = State.RUN;
+ return true;
+ }
+
+ @Override
+ public int schemaVersion() { return schemaVersion; }
+
+ @Override
+ public VectorContainer batch() { return loader.getContainer(); }
+
+ @Override
+ public SelectionVector2 sv2() { return null; }
+
+ @Override
+ public void release() {
+ loader.clear();
+ }
+
+ public void retainData() {
+ retainData = true;
+ }
+
+ @Override
+ public void close() {
+ if (!retainData) {
+ release();
+ }
+
+ // Consume any pending input
+ while (state != State.EOF) {
+ QueryEvent event = listener.get();
+ if (event.type == QueryEvent.Type.EOF) {
+ state = State.EOF;
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 4828f6c..e451dbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.test;
+import static org.junit.Assert.assertEquals;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -58,11 +60,8 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ScalarReader;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
-import org.apache.drill.test.ClientFixture.StatementParser;
import org.joda.time.Period;
-import static org.junit.Assert.assertEquals;
-
/**
* Builder for a Drill query. Provides all types of query formats,
* and a variety of ways to run the query.
@@ -121,21 +120,18 @@ public class QueryBuilder {
* The future used to wait for the completion of an async query. Returns
* just the summary of the query.
*/
-
public static class QuerySummaryFuture implements Future<QuerySummary> {
/**
* Synchronizes the listener thread and the test thread that
* launched the query.
*/
-
private final CountDownLatch lock = new CountDownLatch(1);
private QuerySummary summary;
/**
* Unsupported at present.
*/
-
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
@@ -144,7 +140,6 @@ public class QueryBuilder {
/**
* Always returns false.
*/
-
@Override
public boolean isCancelled() { return false; }
@@ -160,7 +155,6 @@ public class QueryBuilder {
/**
* Not supported at present, just does a non-timeout get.
*/
-
@Override
public QuerySummary get(long timeout, TimeUnit unit) throws InterruptedException {
return get();
@@ -345,56 +339,32 @@ public class QueryBuilder {
*/
public DirectRowSet rowSet() throws RpcException {
- // Ignore all but the first non-empty batch.
- // Always return the last batch, which may be empty.
-
- QueryDataBatch resultBatch = null;
- for (QueryDataBatch batch : results()) {
- if (resultBatch == null) {
- resultBatch = batch;
- } else if (resultBatch.getHeader().getRowCount() == 0) {
- resultBatch.release();
- resultBatch = batch;
- } else if (batch.getHeader().getRowCount() > 0) {
- throw new IllegalStateException("rowSet() returns a single batch, but this query returned multiple batches. Consider rowSetIterator() instead.");
- } else {
- batch.release();
+ VectorContainer batch = null;
+ try (QueryBatchIterator iter = new QueryBatchIterator(client.allocator(), withEventListener())) {
+ while (iter.next()) {
+ batch = iter.batch();
+ if (batch.getRecordCount() != 0) {
+ iter.retainData();
+ break;
+ }
}
+ iter.retainData();
}
-
- // No results?
-
- if (resultBatch == null) {
- return null;
- }
-
- // Unload the batch and convert to a row set.
-
- RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
- loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
- resultBatch.release();
- VectorContainer container = loader.getContainer();
- container.setRecordCount(loader.getRecordCount());
-
- // Null results? Drill will return a single batch with no rows
- // and no columns even if the scan (or other) operator returns
- // no batches at all. For ease of testing, simply map this null
- // result set to a null output row set that says "nothing at all
- // was returned." Note that this is different than an empty result
- // set which has a schema, but no rows.
-
- if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
- container.clear();
+ if (batch == null) {
return null;
+ } else {
+ return DirectRowSet.fromContainer(batch);
}
-
- return DirectRowSet.fromContainer(container);
}
public QueryRowSetIterator rowSetIterator() {
return new QueryRowSetIterator(client.allocator(), withEventListener());
}
+ public QueryRowSetReader rowSetReader() {
+ return QueryRowSetReader.build(client.allocator(), withEventListener());
+ }
+
/**
* Run the query which expect to return vector {@code V} representation
* of type {@code T} for the column {@code columnName}.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index 3a3fe67..d8e2da1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -30,15 +30,14 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
* a very easy way for tests to work with query data using the
* row set tools.
*/
-
public class QueryResultSet {
private final BufferingQueryEventListener listener;
private boolean eof;
- private int recordCount = 0;
- private int batchCount = 0;
- private QueryId queryId = null;
+ private int recordCount;
+ private int batchCount;
+ private QueryId queryId;
@SuppressWarnings("unused")
- private QueryState state = null;
+ private QueryState state;
final RecordBatchLoader loader;
public QueryResultSet(BufferingQueryEventListener listener, BufferAllocator allocator) {
@@ -53,7 +52,6 @@ public class QueryResultSet {
* @return the next batch as a row set, or null if EOF
* @throws Exception on a server error
*/
-
public DirectRowSet next() throws Exception {
if (eof) {
return null;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index acc5056..37e7aeb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -24,74 +24,32 @@ import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetFormatter;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+/**
+ * Converts an incoming set of record batches into an iterator over a
+ * set of row sets. Primarily for testing.
+ */
public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<DirectRowSet> {
- private final BufferingQueryEventListener listener;
- private final BufferAllocator allocator;
- private int recordCount;
- private int batchCount;
- private QueryId queryId;
- private QueryDataBatch batch;
- private QueryState state;
+ private final QueryBatchIterator batchIter;
- QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
- this.allocator = allocator;
- this.listener = listener;
+ public QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
+ batchIter = new QueryBatchIterator(allocator, listener);
}
- public QueryId queryId() { return queryId; }
- public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
- public QueryState finalState() { return state; }
- public int batchCount() { return batchCount; }
- public int rowCount() { return recordCount; }
+ public QueryId queryId() { return batchIter.queryId(); }
+ public String queryIdString() { return batchIter.queryIdString(); }
+ public QueryState finalState() { return batchIter.finalState(); }
+ public int batchCount() { return batchIter.batchCount(); }
+ public int rowCount() { return batchIter.rowCount(); }
@Override
public boolean hasNext() {
- while (true) {
- QueryEvent event = listener.get();
- state = event.state;
- batch = null;
- switch (event.type) {
- case BATCH:
- batchCount++;
- recordCount += event.batch.getHeader().getRowCount();
- batch = event.batch;
- return true;
- case EOF:
- state = event.state;
- return false;
- case ERROR:
- throw new RuntimeException(event.error);
- case QUERY_ID:
- queryId = event.queryId;
- break;
- default:
- throw new IllegalStateException("Unexpected event: " + event.type);
- }
- }
+ return batchIter.next();
}
@Override
public DirectRowSet next() {
-
- if (batch == null) {
- throw new IllegalStateException();
- }
-
- // Unload the batch and convert to a row set.
-
- final RecordBatchLoader loader = new RecordBatchLoader(allocator);
- loader.load(batch.getHeader().getDef(), batch.getData());
- batch.release();
- batch = null;
- VectorContainer container = loader.getContainer();
- container.setRecordCount(loader.getRecordCount());
- return DirectRowSet.fromContainer(container);
+ return DirectRowSet.fromContainer(batchIter.batch());
}
public void printAll() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java
new file mode 100644
index 0000000..8703f28
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+public class QueryRowSetReader extends PullResultSetReaderImpl {
+
+ private final QueryBatchIterator batchIter;
+
+ public QueryRowSetReader(QueryBatchIterator batchIter) {
+ super(batchIter);
+ this.batchIter = batchIter;
+ }
+
+ public static QueryRowSetReader build(BufferAllocator allocator, BufferingQueryEventListener listener) {
+ return new QueryRowSetReader(new QueryBatchIterator(allocator, listener));
+ }
+
+ public QueryId queryId() { return batchIter.queryId(); }
+ public String queryIdString() { return batchIter.queryIdString(); }
+
+ @Override
+ public void close() {
+ batchIter.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java
new file mode 100644
index 0000000..70c4260
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+/**
+ * Very simple parser for semi-colon separated lists of SQL statements which
+ * handles quoted semicolons. Drill can execute only one statement at a time
+ * (without a trailing semi-colon.) This parser breaks up a statement list
+ * into single statements. Input:<code><pre>
+ * USE a.b;
+ * ALTER SESSION SET `foo` = ";";
+ * SELECT * FROM bar WHERE x = "\";";
+ * </pre><code>Output:
+ * <ul>
+ * <li><tt>USE a.b</tt></li>
+ * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li>
+ * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li>
+ */
+public class StatementParser {
+ private final Reader in;
+
+ public StatementParser(Reader in) {
+ this.in = in;
+ }
+
+ public StatementParser(String text) {
+ this(new StringReader(text));
+ }
+
+ public String parseNext() throws IOException {
+ boolean eof = false;
+ StringBuilder buf = new StringBuilder();
+ while (true) {
+ int c = in.read();
+ if (c == -1) {
+ eof = true;
+ break;
+ }
+ if (c == ';') {
+ break;
+ }
+ buf.append((char) c);
+ if (c == '"' || c == '\'' || c == '`') {
+ int quote = c;
+ boolean escape = false;
+ while (true) {
+ c = in.read();
+ if (c == -1) {
+ throw new IllegalArgumentException("Mismatched quote: " + (char) c);
+ }
+ buf.append((char) c);
+ if (! escape && c == quote) {
+ break;
+ }
+ escape = c == '\\';
+ }
+ }
+ }
+ String stmt = buf.toString().trim();
+ if (stmt.isEmpty() && eof) {
+ return null;
+ }
+ return stmt;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 334e90d..925130f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -35,7 +36,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
-import org.bouncycastle.util.Arrays;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.LocalDate;
@@ -45,7 +45,6 @@ import org.joda.time.Period;
/**
* Various utilities useful for working with row sets, especially for testing.
*/
-
public class RowSetUtilities {
private RowSetUtilities() { }
@@ -55,7 +54,6 @@ public class RowSetUtilities {
* and easy way to reverse the sort order of an expected-value row set.
* @param sv2 the SV2 which is reversed in place
*/
-
public static void reverse(SelectionVector2 sv2) {
int count = sv2.getCount();
for (int i = 0; i < count / 2; i++) {
@@ -164,7 +162,7 @@ public class RowSetUtilities {
byte[] expected = (byte[]) expectedObj;
byte[] actual = (byte[]) actualObj;
assertEquals(msg + " - byte lengths differ", expected.length, actual.length);
- assertTrue(msg, Arrays.areEqual(expected, actual));
+ assertTrue(msg, Arrays.equals(expected, actual));
break;
}
case DOUBLE:
@@ -280,6 +278,10 @@ public class RowSetUtilities {
new RowSetComparison(expected).verifyAndClearAll(actual);
}
+ public static void verify(RowSet expected, RowSet actual, int rowCount) {
+ new RowSetComparison(expected).span(rowCount).verifyAndClearAll(actual);
+ }
+
public static BigDecimal dec(String value) {
return new BigDecimal(value);
}