You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/07/07 20:49:50 UTC
[2/2] calcite git commit: [CALCITE-30] Implement Statement.cancel
[CALCITE-30] Implement Statement.cancel
ResultSet.next() now throws an exception indicating that the statement
has been canceled.
Propagate cancel into streaming CSV table (Zhen Wang).
Use AtomicBoolean for cancel flag; deprecate
RelOptPlanner.setCancelFlag.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/d9eb4383
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/d9eb4383
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/d9eb4383
Branch: refs/heads/master
Commit: d9eb4383290e6ec7a5bca8d23fa3e03167c699fe
Parents: 65c1cec
Author: Julian Hyde <jh...@apache.org>
Authored: Wed Jun 1 13:07:04 2016 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Jul 6 19:44:17 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/calcite/DataContext.java | 9 +-
.../calcite/jdbc/CalciteConnectionImpl.java | 30 +++-
.../apache/calcite/jdbc/CalciteResultSet.java | 24 +++
.../calcite/plan/AbstractRelOptPlanner.java | 15 +-
.../org/apache/calcite/plan/RelOptPlanner.java | 9 +-
.../apache/calcite/runtime/CalciteResource.java | 3 +
.../org/apache/calcite/util/CancelFlag.java | 23 ++-
.../calcite/runtime/CalciteResource.properties | 1 +
.../org/apache/calcite/test/StreamTest.java | 42 +++++
.../calcite/adapter/csv/CsvEnumerator.java | 41 ++++-
.../calcite/adapter/csv/CsvFilterableTable.java | 4 +-
.../calcite/adapter/csv/CsvScannableTable.java | 4 +-
.../calcite/adapter/csv/CsvSchemaFactory.java | 12 +-
.../adapter/csv/CsvStreamEnumerator.java | 96 ----------
.../calcite/adapter/csv/CsvStreamReader.java | 76 ++++----
.../adapter/csv/CsvStreamScannableTable.java | 8 +-
.../calcite/adapter/csv/CsvTableScan.java | 3 +-
.../adapter/csv/CsvTranslatableTable.java | 8 +-
.../java/org/apache/calcite/test/CsvTest.java | 173 ++++++++++++++++++-
19 files changed, 398 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/DataContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/DataContext.java b/core/src/main/java/org/apache/calcite/DataContext.java
index 3d5203f..78642aa 100644
--- a/core/src/main/java/org/apache/calcite/DataContext.java
+++ b/core/src/main/java/org/apache/calcite/DataContext.java
@@ -27,6 +27,7 @@ import com.google.common.base.CaseFormat;
import java.lang.reflect.Modifier;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Runtime context allowing access to the tables in a database.
@@ -76,7 +77,13 @@ public interface DataContext {
/** The Spark engine. Available if Spark is on the class path. */
SPARK_CONTEXT("sparkContext", Object.class),
- /** Sql advisor that suggests completion hints. */
+ /** A mutable flag that indicates whether user has requested that the
+ * current statement be canceled. Cancellation may not be immediate, but
+ * implementations of relational operators should check the flag fairly
+ * frequently and cease execution (e.g. by returning end of data). */
+ CANCEL_FLAG("cancelFlag", AtomicBoolean.class),
+
+ /** Advisor that suggests completion hints for SQL statements. */
SQL_ADVISOR("sqlAdvisor", SqlAdvisor.class),
/** Time zone in which the current statement is executing. Required;
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
index 61a8d28..195ced7 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
@@ -76,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of JDBC connection
@@ -197,7 +198,6 @@ abstract class CalciteConnectionImpl
final CalcitePreparedStatement calcitePreparedStatement =
(CalcitePreparedStatement) factory.newPreparedStatement(this, null,
signature, resultSetType, resultSetConcurrency, resultSetHoldability);
- server.addStatement(this, calcitePreparedStatement.handle);
server.getStatement(calcitePreparedStatement.handle).setSignature(signature);
return calcitePreparedStatement;
} catch (Exception e) {
@@ -278,10 +278,25 @@ abstract class CalciteConnectionImpl
map.put("?" + o.i, o.e.toLocal());
}
map.putAll(signature.internalParameters);
+ final AtomicBoolean cancelFlag;
+ try {
+ cancelFlag = getCancelFlag(handle);
+ } catch (NoSuchStatementException e) {
+ throw Throwables.propagate(e);
+ }
+ map.put(DataContext.Variable.CANCEL_FLAG.camelName, cancelFlag);
final DataContext dataContext = createDataContext(map);
return signature.enumerable(dataContext);
}
+ /** Returns the flag that is used to request or check cancel for a particular
+ * statement. */
+ AtomicBoolean getCancelFlag(Meta.StatementHandle handle)
+ throws NoSuchStatementException {
+ final CalciteServerStatement serverStatement = server.getStatement(handle);
+ return ((CalciteServerStatementImpl) serverStatement).cancelFlag;
+ }
+
public DataContext createDataContext(Map<String, Object> parameterValues) {
if (config().spark()) {
return new SlimDataContext();
@@ -301,7 +316,7 @@ abstract class CalciteConnectionImpl
/** Implementation of Queryable. */
static class CalciteQueryable<T> extends BaseQueryable<T> {
- public CalciteQueryable(CalciteConnection connection, Type elementType,
+ CalciteQueryable(CalciteConnection connection, Type elementType,
Expression expression) {
super(connection, elementType, expression);
}
@@ -322,7 +337,11 @@ abstract class CalciteConnectionImpl
public void addStatement(CalciteConnection connection,
Meta.StatementHandle h) {
final CalciteConnectionImpl c = (CalciteConnectionImpl) connection;
- statementMap.put(h.id, new CalciteServerStatementImpl(c));
+ final CalciteServerStatement previous =
+ statementMap.put(h.id, new CalciteServerStatementImpl(c));
+ if (previous != null) {
+ throw new AssertionError();
+ }
}
public CalciteServerStatement getStatement(Meta.StatementHandle h)
@@ -432,7 +451,7 @@ abstract class CalciteConnectionImpl
static class ContextImpl implements CalcitePrepare.Context {
private final CalciteConnectionImpl connection;
- public ContextImpl(CalciteConnectionImpl connection) {
+ ContextImpl(CalciteConnectionImpl connection) {
this.connection = Preconditions.checkNotNull(connection);
}
@@ -491,8 +510,9 @@ abstract class CalciteConnectionImpl
private final CalciteConnectionImpl connection;
private Iterator<Object> iterator;
private Meta.Signature signature;
+ private final AtomicBoolean cancelFlag = new AtomicBoolean();
- public CalciteServerStatementImpl(CalciteConnectionImpl connection) {
+ CalciteServerStatementImpl(CalciteConnectionImpl connection) {
this.connection = Preconditions.checkNotNull(connection);
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
index c45b245..e62417a 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
@@ -22,6 +22,7 @@ import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Handler;
import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.avatica.util.Cursor;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
@@ -29,6 +30,7 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.runtime.ArrayEnumeratorCursor;
import org.apache.calcite.runtime.ObjectEnumeratorCursor;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.sql.ResultSet;
@@ -36,17 +38,27 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.calcite.util.Static.RESOURCE;
/**
* Implementation of {@link ResultSet}
* for the Calcite engine.
*/
public class CalciteResultSet extends AvaticaResultSet {
+ private final AtomicBoolean cancelFlag;
+
CalciteResultSet(AvaticaStatement statement,
CalcitePrepare.CalciteSignature calciteSignature,
ResultSetMetaData resultSetMetaData, TimeZone timeZone,
Meta.Frame firstFrame) {
super(statement, null, calciteSignature, resultSetMetaData, timeZone, firstFrame);
+ try {
+ cancelFlag = getCalciteConnection().getCancelFlag(statement.handle);
+ } catch (NoSuchStatementException e) {
+ throw Throwables.propagate(e);
+ }
}
@Override protected CalciteResultSet execute() throws SQLException {
@@ -66,6 +78,18 @@ public class CalciteResultSet extends AvaticaResultSet {
return this;
}
+ @Override protected void cancel() {
+ cancelFlag.compareAndSet(false, true);
+ }
+
+ @Override public boolean next() throws SQLException {
+ final boolean next = super.next();
+ if (cancelFlag.get()) {
+ throw new SQLException(RESOURCE.statementCanceled().str());
+ }
+ return next;
+ }
+
@Override public ResultSet create(ColumnMetaData.AvaticaType elementType,
Iterable<Object> iterable) {
final List<ColumnMetaData> columnMetaDataList;
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java b/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
index e8f7578..875ecd7 100644
--- a/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.apache.calcite.util.Static.RESOURCE;
@@ -60,7 +61,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
private Pattern ruleDescExclusionFilter;
- private CancelFlag cancelFlag;
+ private final AtomicBoolean cancelFlag;
private final Set<Class<? extends RelNode>> classes = new HashSet<>();
@@ -76,7 +77,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
/**
* Creates an AbstractRelOptPlanner.
*/
- protected AbstractRelOptPlanner(RelOptCostFactory costFactory, //
+ protected AbstractRelOptPlanner(RelOptCostFactory costFactory,
Context context) {
assert costFactory != null;
this.costFactory = costFactory;
@@ -85,9 +86,9 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
}
this.context = context;
- // In case no one calls setCancelFlag, set up a
- // dummy here.
- cancelFlag = new CancelFlag();
+ final CancelFlag cancelFlag = context.unwrap(CancelFlag.class);
+ this.cancelFlag = cancelFlag != null ? cancelFlag.atomicBoolean
+ : new AtomicBoolean();
// Add abstract RelNode classes. No RelNodes will ever be registered with
// these types, but some operands may use them.
@@ -108,7 +109,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
}
public void setCancelFlag(CancelFlag cancelFlag) {
- this.cancelFlag = cancelFlag;
+ // ignored
}
/**
@@ -116,7 +117,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
* an exception.
*/
public void checkCancel() {
- if (cancelFlag.isCancelRequested()) {
+ if (cancelFlag.get()) {
throw RESOURCE.preparationAborted().ex();
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java b/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
index 58f4449..f4ba090 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
@@ -122,12 +122,15 @@ public interface RelOptPlanner {
void setRuleDescExclusionFilter(Pattern exclusionFilter);
/**
- * Installs the cancellation-checking flag for this planner. The planner
- * should periodically check this flag and terminate the planning process if
- * it sees a cancellation request.
+ * Does nothing.
+ *
+ * @deprecated Previously, this method installed the cancellation-checking
+ * flag for this planner, but is now deprecated. Now, you should add a
+ * {@link CancelFlag} to the {@link Context} passed to the constructor.
*
* @param cancelFlag flag which the planner should periodically check
*/
+ @Deprecated // to be removed before 2.0
void setCancelFlag(CancelFlag cancelFlag);
/**
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 80ce001..b936859 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -615,6 +615,9 @@ public interface CalciteResource {
@BaseMessage("SELECT must have a FROM clause")
ExInst<SqlValidatorException> selectMissingFrom();
+
+ @BaseMessage("Statement canceled")
+ Inst statementCanceled();
}
// End CalciteResource.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/util/CancelFlag.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/CancelFlag.java b/core/src/main/java/org/apache/calcite/util/CancelFlag.java
index b1f57b7..61e678b 100644
--- a/core/src/main/java/org/apache/calcite/util/CancelFlag.java
+++ b/core/src/main/java/org/apache/calcite/util/CancelFlag.java
@@ -16,13 +16,28 @@
*/
package org.apache.calcite.util;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptPlanner;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* CancelFlag is used to post and check cancellation requests.
+ *
+ * <p>Pass it to {@link RelOptPlanner} by putting it into a {@link Context}.
*/
public class CancelFlag {
//~ Instance fields --------------------------------------------------------
- private boolean cancelRequested;
+ /** The flag that holds the cancel state.
+ * Feel free to use the flag directly. */
+ public final AtomicBoolean atomicBoolean;
+
+ public CancelFlag(AtomicBoolean atomicBoolean) {
+ this.atomicBoolean = Preconditions.checkNotNull(atomicBoolean);
+ }
//~ Methods ----------------------------------------------------------------
@@ -30,21 +45,21 @@ public class CancelFlag {
* @return whether a cancellation has been requested
*/
public boolean isCancelRequested() {
- return cancelRequested;
+ return atomicBoolean.get();
}
/**
* Requests a cancellation.
*/
public void requestCancel() {
- cancelRequested = true;
+ atomicBoolean.compareAndSet(false, true);
}
/**
* Clears any pending cancellation request.
*/
public void clearCancel() {
- cancelRequested = false;
+ atomicBoolean.compareAndSet(true, false);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 3573f00..bab7a4a 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -201,4 +201,5 @@ TableNotFound=Table ''{0}'' not found
FilterMustBeBoolean=FILTER expression must be of type BOOLEAN
CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream
SelectMissingFrom=SELECT must have a FROM clause
+StatementCanceled=Statement canceled
# End CalciteResource.properties
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 6d79b1d..6b6e225 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -51,7 +51,10 @@ import java.util.Iterator;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for streaming queries.
@@ -234,6 +237,45 @@ public class StreamTest {
.returnsCount(100);
}
+ @Test(timeout = 10000) public void testStreamCancel() {
+ final String explain = "EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[INFINITE_STREAMS, ORDERS, (STREAM)]])";
+ CalciteAssert.model(STREAM_MODEL)
+ .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+ .query("select stream * from orders")
+ .explainContains(explain)
+ .returns(
+ new Function<ResultSet, Void>() {
+ public Void apply(final ResultSet resultSet) {
+ int n = 0;
+ try {
+ while (resultSet.next()) {
+ if (++n == 5) {
+ new Thread(
+ new Runnable() {
+ @Override public void run() {
+ try {
+ Thread.sleep(3);
+ resultSet.getStatement().cancel();
+ } catch (InterruptedException | SQLException e) {
+ // ignore
+ }
+ }
+ }).start();
+ }
+ }
+ fail("expected cancel, got end-of-data");
+ } catch (SQLException e) {
+ assertThat(e.getMessage(), is("Statement canceled"));
+ }
+ // With a 3 millisecond delay, typically n is between 200 - 400
+ // before cancel takes effect.
+ assertTrue("n is " + n, n > 5);
+ return null;
+ }
+ });
+ }
+
@Test public void testStreamToRelationJoin() {
CalciteAssert.model(STREAM_JOINS_MODEL)
.withDefaultSchema(STREAM_JOINS_SCHEMA_NAME)
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
index 5b942fa..716992c 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang3.time.FastDateFormat;
import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.base.Throwables;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
@@ -37,6 +39,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
@@ -47,6 +50,7 @@ import java.util.zip.GZIPInputStream;
class CsvEnumerator<E> implements Enumerator<E> {
private final CSVReader reader;
private final String[] filterValues;
+ private final AtomicBoolean cancelFlag;
private final RowConverter<E> rowConverter;
private E current;
@@ -62,21 +66,29 @@ class CsvEnumerator<E> implements Enumerator<E> {
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
}
- public CsvEnumerator(File file, List<CsvFieldType> fieldTypes) {
- this(file, fieldTypes, identityList(fieldTypes.size()));
+ public CsvEnumerator(File file, AtomicBoolean cancelFlag,
+ List<CsvFieldType> fieldTypes) {
+ this(file, cancelFlag, fieldTypes, identityList(fieldTypes.size()));
}
- public CsvEnumerator(File file, List<CsvFieldType> fieldTypes, int[] fields) {
+ public CsvEnumerator(File file, AtomicBoolean cancelFlag,
+ List<CsvFieldType> fieldTypes, int[] fields) {
//noinspection unchecked
- this(file, null, (RowConverter<E>) converter(fieldTypes, fields));
+ this(file, cancelFlag, false, null,
+ (RowConverter<E>) converter(fieldTypes, fields));
}
- public CsvEnumerator(File file, String[] filterValues,
- RowConverter<E> rowConverter) {
+ public CsvEnumerator(File file, AtomicBoolean cancelFlag, boolean stream,
+ String[] filterValues, RowConverter<E> rowConverter) {
+ this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues;
try {
- this.reader = openCsv(file);
+ if (stream) {
+ this.reader = new CsvStreamReader(file);
+ } else {
+ this.reader = openCsv(file);
+ }
this.reader.readNext(); // skip header row
} catch (IOException e) {
throw new RuntimeException(e);
@@ -94,7 +106,7 @@ class CsvEnumerator<E> implements Enumerator<E> {
}
static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
- List<CsvFieldType> fieldTypes) {
+ List<CsvFieldType> fieldTypes) {
return deduceRowType(typeFactory, file, fieldTypes, false);
}
@@ -106,7 +118,7 @@ class CsvEnumerator<E> implements Enumerator<E> {
final List<String> names = new ArrayList<>();
CSVReader reader = null;
if (stream) {
- names.add("ROWTIME");
+ names.add(CsvSchemaFactory.ROWTIME_COLUMN_NAME);
types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
}
try {
@@ -180,8 +192,19 @@ class CsvEnumerator<E> implements Enumerator<E> {
try {
outer:
for (;;) {
+ if (cancelFlag.get()) {
+ return false;
+ }
final String[] strings = reader.readNext();
if (strings == null) {
+ if (reader instanceof CsvStreamReader) {
+ try {
+ Thread.sleep(CsvStreamReader.DEFAULT_MONITOR_DELAY);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ continue;
+ }
current = null;
reader.close();
return false;
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
index 4ca26f7..593de5a 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
@@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlKind;
import java.io.File;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Table based on a CSV file that can implement simple filtering.
@@ -58,9 +59,10 @@ public class CsvFilterableTable extends CsvTable
}
}
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+ final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
- return new CsvEnumerator<Object[]>(file, filterValues,
+ return new CsvEnumerator<>(file, cancelFlag, false, filterValues,
new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
}
};
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
index 005d15b..5078afe 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.ScannableTable;
import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Table based on a CSV file.
@@ -44,9 +45,10 @@ public class CsvScannableTable extends CsvTable
public Enumerable<Object[]> scan(DataContext root) {
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+ final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
- return new CsvEnumerator<Object[]>(file,
+ return new CsvEnumerator<>(file, cancelFlag, false,
null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
}
};
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
index 9f3c2bd..44520c1 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
@@ -28,12 +28,18 @@ import java.util.Map;
* Factory that creates a {@link CsvSchema}.
*
* <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
- * file.</p>
+ * file.
*/
@SuppressWarnings("UnusedDeclaration")
public class CsvSchemaFactory implements SchemaFactory {
- // public constructor, per factory contract
- public CsvSchemaFactory() {
+ /** Name of the column that is implicitly created in a CSV stream table
+ * to hold the data arrival time. */
+ static final String ROWTIME_COLUMN_NAME = "ROWTIME";
+
+ /** Public singleton, per factory contract. */
+ public static final CsvSchemaFactory INSTANCE = new CsvSchemaFactory();
+
+ private CsvSchemaFactory() {
}
public Schema create(SchemaPlus parentSchema, String name,
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
deleted file mode 100644
index 4609f11..0000000
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.linq4j.Enumerator;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Csv Streaming enumerator
- * @param <E> Row type
- */
-public class CsvStreamEnumerator<E> implements Enumerator<E> {
- protected CsvStreamReader streamReader;
- protected String[] filterValues;
- protected CsvEnumerator.RowConverter<E> rowConverter;
- protected E current;
-
- public CsvStreamEnumerator(File file, String[] filterValues,
- CsvEnumerator.RowConverter<E> rowConverter) {
- this.rowConverter = rowConverter;
- this.filterValues = filterValues;
- try {
- this.streamReader = new CsvStreamReader(file);
- this.streamReader.readNext(); // skip header row
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public boolean moveNext() {
- return true;
- }
-
- public E readNext() {
- try {
- outer:
- for (;;) {
- final String[] strings = streamReader.readNext();
- if (strings == null) {
- current = null;
- streamReader.close();
- return current;
- } else {
- if (filterValues != null) {
- for (int i = 0; i < strings.length; i++) {
- String filterValue = filterValues[i];
- if (filterValue != null) {
- if (!filterValue.equals(strings[i])) {
- continue outer;
- }
- }
- }
- }
- current = rowConverter.convertRow(strings);
- return current;
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override public E current() {
- return readNext();
- }
-
- @Override public void close() {
- try {
- streamReader.close();
- } catch (IOException e) {
- throw new RuntimeException("Error closing Csv Stream reader", e);
- }
- }
-
- @Override public void reset() {
- throw new UnsupportedOperationException();
- }
-}
-
-// End CsvStreamEnumerator.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
index 5f2cf19..e97278a 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
@@ -21,17 +21,21 @@ import org.apache.commons.io.input.TailerListener;
import org.apache.commons.io.input.TailerListenerAdapter;
import au.com.bytecode.opencsv.CSVParser;
+import au.com.bytecode.opencsv.CSVReader;
+
+import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.StringReader;
import java.util.ArrayDeque;
import java.util.Queue;
/**
- * CSVSreamReader that can read newly appended file content
+ * Extension to {@link CSVReader} that can read newly appended file content.
*/
-public class CsvStreamReader implements Closeable {
+class CsvStreamReader extends CSVReader implements Closeable {
protected CSVParser parser;
protected int skipLines;
protected Tailer tailer;
@@ -47,48 +51,44 @@ public class CsvStreamReader implements Closeable {
*/
public static final long DEFAULT_MONITOR_DELAY = 2000;
- public CsvStreamReader(File csvFile) {
- this(
- csvFile,
+ CsvStreamReader(File csvFile) {
+ this(csvFile,
CSVParser.DEFAULT_SEPARATOR,
CSVParser.DEFAULT_QUOTE_CHARACTER,
CSVParser.DEFAULT_ESCAPE_CHARACTER,
DEFAULT_SKIP_LINES,
CSVParser.DEFAULT_STRICT_QUOTES,
- CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE
- );
+ CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE);
}
/**
- * Constructs CSVReader with supplied separator and quote char.
+ * Creates a CsvStreamReader with supplied separator and quote char.
*
- * @param csvFile the file to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param line the line number to skip for start reading
- * @param strictQuotes sets if characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace it true, parser should ignore
+ * @param csvFile The file to an underlying CSV source.
+ * @param separator The delimiter to use for separating entries
+ * @param quoteChar The character to use for quoted elements
+ * @param escape The character to use for escaping a separator or quote
+ * @param line The line number to skip for start reading
+ * @param strictQuotes Sets if characters outside the quotes are ignored
+ * @param ignoreLeadingWhiteSpace If true, parser should ignore
* white space before a quote in a field
*/
- public CsvStreamReader(File csvFile, char separator, char quotechar, char escape, int line,
- boolean strictQuotes, boolean ignoreLeadingWhiteSpace) {
- contentQueue = new ArrayDeque<String>();
- TailerListener listener = new CSVContentListener(contentQueue);
- tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false, true, 4096);
- this.parser = new CSVParser(
- separator,
- quotechar,
- escape,
- strictQuotes,
- ignoreLeadingWhiteSpace
- );
+ private CsvStreamReader(File csvFile, char separator, char quoteChar,
+ char escape, int line, boolean strictQuotes,
+ boolean ignoreLeadingWhiteSpace) {
+ super(new StringReader("")); // dummy call to base constructor
+ contentQueue = new ArrayDeque<>();
+ TailerListener listener = new CsvContentListener(contentQueue);
+ tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false,
+ true, 4096);
+ this.parser = new CSVParser(separator, quoteChar, escape, strictQuotes,
+ ignoreLeadingWhiteSpace);
this.skipLines = line;
try {
- //wait for tailer to capture data
+ // wait for tailer to capture data
Thread.sleep(DEFAULT_MONITOR_DELAY);
} catch (InterruptedException e) {
- //ignore the interruption
+ throw Throwables.propagate(e);
}
}
@@ -100,17 +100,11 @@ public class CsvStreamReader implements Closeable {
* @throws IOException if bad things happen during the read
*/
public String[] readNext() throws IOException {
-
String[] result = null;
do {
String nextLine = getNextLine();
- while (nextLine == null) {
- try {
- Thread.sleep(DEFAULT_MONITOR_DELAY);
- nextLine = getNextLine();
- } catch (InterruptedException e) {
- return null; // should throw if still pending?
- }
+ if (nextLine == null) {
+ return null;
}
String[] r = parser.parseLineMulti(nextLine);
if (r.length > 0) {
@@ -146,11 +140,11 @@ public class CsvStreamReader implements Closeable {
public void close() throws IOException {
}
- /** csv file content watcher*/
- class CSVContentListener extends TailerListenerAdapter {
- Queue<String> contentQueue;
+ /** Watches for content being appended to a CSV file. */
+ private static class CsvContentListener extends TailerListenerAdapter {
+ final Queue<String> contentQueue;
- CSVContentListener(Queue<String> contentQueue) {
+ CsvContentListener(Queue<String> contentQueue) {
this.contentQueue = contentQueue;
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
index 752f8a7..72af281 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.schema.Table;
import java.io.File;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Table based on a CSV file.
@@ -49,7 +50,7 @@ public class CsvStreamScannableTable extends CsvScannableTable
return protoRowType.apply(typeFactory);
}
if (fieldTypes == null) {
- fieldTypes = new ArrayList<CsvFieldType>();
+ fieldTypes = new ArrayList<>();
return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, fieldTypes, true);
} else {
return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, null, true);
@@ -62,10 +63,11 @@ public class CsvStreamScannableTable extends CsvScannableTable
public Enumerable<Object[]> scan(DataContext root) {
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+ final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
- return new CsvStreamEnumerator<Object[]>(file,
- null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true));
+ return new CsvEnumerator<>(file, cancelFlag, true, null,
+ new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true));
}
};
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
index 8a53e84..5165aab 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
@@ -97,7 +97,8 @@ public class CsvTableScan extends TableScan implements EnumerableRel {
physType,
Blocks.toBlock(
Expressions.call(table.getExpression(CsvTranslatableTable.class),
- "project", Expressions.constant(fields))));
+ "project", implementor.getRootExpression(),
+ Expressions.constant(fields))));
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
index 5ae9880..8970e28 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.csv;
+import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
@@ -32,6 +33,7 @@ import org.apache.calcite.schema.TranslatableTable;
import java.io.File;
import java.lang.reflect.Type;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Table based on a CSV file.
@@ -50,10 +52,12 @@ public class CsvTranslatableTable extends CsvTable
/** Returns an enumerable over a given projection of the fields.
*
* <p>Called from generated code. */
- public Enumerable<Object> project(final int[] fields) {
+ public Enumerable<Object> project(final DataContext root,
+ final int[] fields) {
+ final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object>() {
public Enumerator<Object> enumerator() {
- return new CsvEnumerator<Object>(file, fieldTypes, fields);
+ return new CsvEnumerator<>(file, cancelFlag, fieldTypes, fields);
}
};
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
----------------------------------------------------------------------
diff --git a/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java b/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
index 173994b..353758a 100644
--- a/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
+++ b/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
@@ -17,10 +17,12 @@
package org.apache.calcite.test;
import org.apache.calcite.adapter.csv.CsvSchemaFactory;
+import org.apache.calcite.adapter.csv.CsvStreamTableFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.util.Util;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@@ -29,7 +31,10 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
+import java.io.FileWriter;
import java.io.PrintStream;
+import java.io.PrintWriter;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -40,11 +45,18 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* Unit test of the Calcite adapter for CSV.
@@ -67,10 +79,6 @@ public class CsvTest {
}
}
- public static String toLinux(String s) {
- return s.replaceAll("\r\n", "\n");
- }
-
/**
* Tests the vanity driver.
*/
@@ -302,7 +310,7 @@ public class CsvTest {
.append(resultSet.getString(i));
sep = "; ";
}
- result.add(toLinux(buf.toString()));
+ result.add(Util.toLinux(buf.toString()));
}
}
@@ -488,7 +496,7 @@ public class CsvTest {
CalciteConnection.class);
final Schema schema =
- new CsvSchemaFactory()
+ CsvSchemaFactory.INSTANCE
.create(calciteConnection.getRootSchema(), null,
ImmutableMap.<String, Object>of("directory",
resourcePath("sales"), "flavor", "scannable"));
@@ -503,6 +511,159 @@ public class CsvTest {
expect.apply(resultSet1);
}
}
+
+ @Test(timeout = 10000) public void testCsvStream() throws Exception {
+ final File file = File.createTempFile("stream", "csv");
+ final String model = "{\n"
+ + " version: '1.0',\n"
+ + " defaultSchema: 'STREAM',\n"
+ + " schemas: [\n"
+ + " {\n"
+ + " name: 'SS',\n"
+ + " tables: [\n"
+ + " {\n"
+ + " name: 'DEPTS',\n"
+ + " type: 'custom',\n"
+ + " factory: '" + CsvStreamTableFactory.class.getName()
+ + "',\n"
+ + " stream: {\n"
+ + " stream: true\n"
+ + " },\n"
+ + " operand: {\n"
+ + " file: '" + file.getAbsolutePath() + "',\n"
+ + " flavor: \"scannable\"\n"
+ + " }\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + "}\n";
+ final String[] strings = {
+ "DEPTNO:int,NAME:string",
+ "10,\"Sales\"",
+ "20,\"Marketing\"",
+ "30,\"Engineering\""
+ };
+
+ try (final Connection connection =
+ DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
+ final PrintWriter pw = new PrintWriter(new FileWriter(file));
+ final Worker<Void> worker = new Worker<>()) {
+ final Thread thread = new Thread(worker);
+ thread.start();
+
+ // Add some rows so that the table can deduce its row type.
+ final Iterator<String> lines = Arrays.asList(strings).iterator();
+ worker.queue.put(writeLine(pw, lines.next())); // header
+ worker.queue.put(writeLine(pw, lines.next())); // first row
+ worker.queue.put(sleep(10));
+ worker.queue.put(writeLine(pw, lines.next())); // second row
+ final CalciteConnection calciteConnection =
+ connection.unwrap(CalciteConnection.class);
+ final String sql = "select stream * from \"SS\".\"DEPTS\"";
+ final PreparedStatement statement =
+ calciteConnection.prepareStatement(sql);
+ final ResultSet resultSet = statement.executeQuery();
+ int count = 0;
+ try {
+ while (resultSet.next()) {
+ ++count;
+ if (lines.hasNext()) {
+ worker.queue.put(sleep(10));
+ worker.queue.put(writeLine(pw, lines.next()));
+ } else {
+ worker.queue.put(cancel(statement));
+ }
+ }
+ fail("expected exception, got end of data");
+ } catch (SQLException e) {
+ assertThat(e.getMessage(), is("Statement canceled"));
+ }
+ assertThat(count, anyOf(is(strings.length - 2), is(strings.length - 1)));
+ assertThat(worker.e, nullValue());
+ assertThat(worker.v, nullValue());
+ } finally {
+ Util.discard(file.delete());
+ }
+ }
+
+ /** Creates a command that appends a line to the CSV file. */
+ private Callable<Void> writeLine(final PrintWriter pw, final String line) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ pw.println(line);
+ pw.flush();
+ return null;
+ }
+ };
+ }
+
+ /** Creates a command that sleeps. */
+ private Callable<Void> sleep(final long millis) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.sleep(millis);
+ return null;
+ }
+ };
+ }
+
+ /** Creates a command that cancels a statement. */
+ private Callable<Void> cancel(final Statement statement) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ statement.cancel();
+ return null;
+ }
+ };
+ }
+
+ /** Receives commands on a queue and executes them on its own thread.
+ * Call {@link #close} to terminate.
+ *
+ * @param <E> Result value of commands
+ */
+ private static class Worker<E> implements Runnable, AutoCloseable {
+ /** Queue of commands. */
+ final BlockingQueue<Callable<E>> queue =
+ new ArrayBlockingQueue<>(5);
+
+ /** Value returned by the most recent command. */
+ private E v;
+
+ /** Exception thrown by a command or queue wait. */
+ private Exception e;
+
+ /** The poison pill command. */
+ final Callable<E> end =
+ new Callable<E>() {
+ public E call() {
+ return null;
+ }
+ };
+
+ public void run() {
+ try {
+ for (;;) {
+ final Callable<E> c = queue.take();
+ if (c == end) {
+ return;
+ }
+ this.v = c.call();
+ }
+ } catch (Exception e) {
+ this.e = e;
+ }
+ }
+
+ public void close() {
+ try {
+ queue.put(end);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
}
// End CsvTest.java