You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/06/05 22:22:51 UTC
[10/12] calcite git commit: [CALCITE-1239] Upgrade to avatica-1.8.0
[CALCITE-1239] Upgrade to avatica-1.8.0
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/eb02ac3f
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/eb02ac3f
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/eb02ac3f
Branch: refs/heads/master
Commit: eb02ac3fcef9deb2b16668f5dc06c9a89c0e658e
Parents: e25ceef
Author: Julian Hyde <jh...@apache.org>
Authored: Mon May 23 08:47:30 2016 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Jun 4 21:35:29 2016 -0700
----------------------------------------------------------------------
.../config/CalciteConnectionProperty.java | 4 +-
.../calcite/jdbc/CalciteConnectionImpl.java | 10 ++-
.../apache/calcite/jdbc/CalciteMetaImpl.java | 94 ++++++++++++++++++--
.../apache/calcite/jdbc/CalciteStatement.java | 17 +++-
.../apache/calcite/server/CalciteServer.java | 10 ++-
.../calcite/jdbc/CalciteRemoteDriverTest.java | 32 +++++++
pom.xml | 2 +-
7 files changed, 154 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
index 7bf157d..b63083c 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
@@ -18,7 +18,6 @@ package org.apache.calcite.config;
import org.apache.calcite.avatica.ConnectionProperty;
import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.util.Bug;
import java.util.HashMap;
import java.util.Map;
@@ -48,8 +47,7 @@ public enum CalciteConnectionProperty implements ConnectionProperty {
/** How many rows the Druid adapter should fetch at a time when executing
* "select" queries. */
- DRUID_FETCH("druidFetch", Type.STRING, "16384",
- Bug.upgrade("convert to Type.NUMBER after [CALCITE-1207]")),
+ DRUID_FETCH("druidFetch", Type.NUMBER, 16384, false),
/** URI of the model. */
MODEL("model", Type.STRING, null, false),
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
index 4319fe0..61a8d28 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
@@ -26,6 +26,7 @@ import org.apache.calcite.avatica.Helper;
import org.apache.calcite.avatica.InternalProperty;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -324,8 +325,13 @@ abstract class CalciteConnectionImpl
statementMap.put(h.id, new CalciteServerStatementImpl(c));
}
- public CalciteServerStatement getStatement(Meta.StatementHandle h) {
- return statementMap.get(h.id);
+ public CalciteServerStatement getStatement(Meta.StatementHandle h)
+ throws NoSuchStatementException {
+ CalciteServerStatement statement = statementMap.get(h.id);
+ if (statement == null) {
+ throw new NoSuchStatementException(h);
+ }
+ return statement;
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index fec7346..434b1d4 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -57,6 +57,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
import java.lang.reflect.Field;
import java.sql.Connection;
@@ -151,7 +153,13 @@ public class CalciteMetaImpl extends MetaImpl {
@Override public void closeStatement(StatementHandle h) {
final CalciteConnectionImpl calciteConnection = getConnection();
- CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
+ final CalciteServerStatement stmt;
+ try {
+ stmt = calciteConnection.server.getStatement(h);
+ } catch (NoSuchStatementException e) {
+ // statement is not valid; nothing to do
+ return;
+ }
// stmt.close(); // TODO: implement
calciteConnection.server.removeStatement(h);
}
@@ -553,7 +561,13 @@ public class CalciteMetaImpl extends MetaImpl {
final StatementHandle h = createStatement(ch);
final CalciteConnectionImpl calciteConnection = getConnection();
- CalciteServerStatement statement = calciteConnection.server.getStatement(h);
+ final CalciteServerStatement statement;
+ try {
+ statement = calciteConnection.server.getStatement(h);
+ } catch (NoSuchStatementException e) {
+ // Not possible. We just created a statement.
+ throw new AssertionError("missing statement", e);
+ }
h.signature =
calciteConnection.parseQuery(CalcitePrepare.Query.of(sql),
statement.createPrepareContext(), maxRowCount);
@@ -562,7 +576,14 @@ public class CalciteMetaImpl extends MetaImpl {
}
@Override public ExecuteResult prepareAndExecute(StatementHandle h,
- String sql, long maxRowCount, PrepareCallback callback) {
+ String sql, long maxRowCount, PrepareCallback callback)
+ throws NoSuchStatementException {
+ return prepareAndExecute(h, sql, maxRowCount, -1, callback);
+ }
+
+ @Override public ExecuteResult prepareAndExecute(StatementHandle h,
+ String sql, long maxRowCount, int maxRowsInFirstFrame,
+ PrepareCallback callback) throws NoSuchStatementException {
final CalcitePrepare.CalciteSignature<Object> signature;
try {
synchronized (callback.getMonitor()) {
@@ -585,7 +606,8 @@ public class CalciteMetaImpl extends MetaImpl {
// TODO: share code with prepare and createIterable
}
- @Override public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) {
+ @Override public Frame fetch(StatementHandle h, long offset,
+ int fetchMaxRowCount) throws NoSuchStatementException {
final CalciteConnectionImpl calciteConnection = getConnection();
CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
final Signature signature = stmt.getSignature();
@@ -608,7 +630,14 @@ public class CalciteMetaImpl extends MetaImpl {
}
@Override public ExecuteResult execute(StatementHandle h,
- List<TypedValue> parameterValues, long maxRowCount) {
+ List<TypedValue> parameterValues, long maxRowCount)
+ throws NoSuchStatementException {
+ return execute(h, parameterValues, Ints.saturatedCast(maxRowCount));
+ }
+
+ @Override public ExecuteResult execute(StatementHandle h,
+ List<TypedValue> parameterValues, int maxRowsInFirstFrame)
+ throws NoSuchStatementException {
final CalciteConnectionImpl calciteConnection = getConnection();
CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
final Signature signature = stmt.getSignature();
@@ -633,6 +662,61 @@ public class CalciteMetaImpl extends MetaImpl {
return new ExecuteResult(ImmutableList.of(metaResultSet));
}
+ @Override public ExecuteBatchResult executeBatch(StatementHandle h,
+ List<List<TypedValue>> parameterValueLists) throws NoSuchStatementException {
+ final List<Long> updateCounts = new ArrayList<>();
+ for (List<TypedValue> parameterValueList : parameterValueLists) {
+ ExecuteResult executeResult = execute(h, parameterValueList, -1);
+ final long updateCount =
+ executeResult.resultSets.size() == 1
+ ? executeResult.resultSets.get(0).updateCount
+ : -1L;
+ updateCounts.add(updateCount);
+ }
+ return new ExecuteBatchResult(Longs.toArray(updateCounts));
+ }
+
+ @Override public ExecuteBatchResult prepareAndExecuteBatch(
+ final StatementHandle h,
+ List<String> sqlCommands) throws NoSuchStatementException {
+ final CalciteConnectionImpl calciteConnection = getConnection();
+ final CalciteServerStatement statement =
+ calciteConnection.server.getStatement(h);
+ final List<Long> updateCounts = new ArrayList<>();
+ final Meta.PrepareCallback callback =
+ new Meta.PrepareCallback() {
+ long updateCount;
+ Signature signature;
+
+ public Object getMonitor() {
+ return statement;
+ }
+
+ public void clear() throws SQLException {}
+
+ public void assign(Meta.Signature signature, Meta.Frame firstFrame,
+ long updateCount) throws SQLException {
+ this.signature = signature;
+ this.updateCount = updateCount;
+ }
+
+ public void execute() throws SQLException {
+ if (signature.statementType.canUpdate()) {
+ final Iterable<Object> iterable =
+ _createIterable(h, signature, ImmutableList.<TypedValue>of(),
+ null);
+ final Iterator<Object> iterator = iterable.iterator();
+ updateCount = ((Number) iterator.next()).longValue();
+ }
+ updateCounts.add(updateCount);
+ }
+ };
+ for (String sqlCommand : sqlCommands) {
+ Util.discard(prepareAndExecute(h, sqlCommand, -1L, -1, callback));
+ }
+ return new ExecuteBatchResult(Longs.toArray(updateCounts));
+ }
+
/** A trojan-horse method, subject to change without notice. */
@VisibleForTesting
public static DataContext createDataContext(CalciteConnection connection) {
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
index 298b56c..2cb70e8 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
@@ -19,6 +19,7 @@ package org.apache.calcite.jdbc;
import org.apache.calcite.avatica.AvaticaResultSet;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.server.CalciteServerStatement;
@@ -48,7 +49,13 @@ public abstract class CalciteStatement extends AvaticaStatement {
@Override public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface == CalciteServerStatement.class) {
- return iface.cast(getConnection().server.getStatement(handle));
+ final CalciteServerStatement statement;
+ try {
+ statement = getConnection().server.getStatement(handle);
+ } catch (NoSuchStatementException e) {
+ throw new AssertionError("invalid statement", e);
+ }
+ return iface.cast(statement);
}
return super.unwrap(iface);
}
@@ -65,8 +72,12 @@ public abstract class CalciteStatement extends AvaticaStatement {
Queryable<T> queryable) {
final CalciteConnectionImpl calciteConnection = getConnection();
final CalcitePrepare prepare = calciteConnection.prepareFactory.apply();
- final CalciteServerStatement serverStatement =
- calciteConnection.server.getStatement(handle);
+ final CalciteServerStatement serverStatement;
+ try {
+ serverStatement = calciteConnection.server.getStatement(handle);
+ } catch (NoSuchStatementException e) {
+ throw new AssertionError("invalid statement", e);
+ }
final CalcitePrepare.Context prepareContext =
serverStatement.createPrepareContext();
return prepare.prepareQueryable(prepareContext, queryable);
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/server/CalciteServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/server/CalciteServer.java b/core/src/main/java/org/apache/calcite/server/CalciteServer.java
index b5e8b8e..9d769db 100644
--- a/core/src/main/java/org/apache/calcite/server/CalciteServer.java
+++ b/core/src/main/java/org/apache/calcite/server/CalciteServer.java
@@ -17,6 +17,7 @@
package org.apache.calcite.server;
import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.jdbc.CalciteConnection;
/**
@@ -30,7 +31,14 @@ public interface CalciteServer {
void addStatement(CalciteConnection connection, Meta.StatementHandle h);
- CalciteServerStatement getStatement(Meta.StatementHandle h);
+ /** Returns the statement with a given handle.
+ *
+ * @param h Statement handle
+ * @return Statement, never null
+ * @throws NoSuchStatementException if handle does not represent a statement
+ */
+ CalciteServerStatement getStatement(Meta.StatementHandle h)
+ throws NoSuchStatementException;
}
// End CalciteServer.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
index 7ce1ac4..18ab7bd 100644
--- a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
+++ b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.hamcrest.CoreMatchers;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -795,6 +796,37 @@ public class CalciteRemoteDriverTest {
assertThat(resultSet, nullValue());
int updateCount = statement.getUpdateCount();
assertThat(updateCount, is(1));
+ connection.close();
+ }
+
+ /** Test remote Statement batched insert. */
+ @Test public void testInsertBatch() throws Exception {
+ final Connection connection = DriverManager.getConnection(
+ "jdbc:avatica:remote:factory="
+ + LocalServiceModifiableFactory.class.getName());
+ assertThat(connection.getMetaData().supportsBatchUpdates(), is(true));
+ assertThat(connection.isClosed(), is(false));
+ Statement statement = connection.createStatement();
+ assertThat(statement.isClosed(), is(false));
+
+ String sql = "insert into \"foo\".\"bar\" values (1, 1, 'second', 2, 2)";
+ statement.addBatch(sql);
+ statement.addBatch(sql);
+ int[] updateCounts = statement.executeBatch();
+ assertThat(updateCounts.length, is(2));
+ assertThat(updateCounts[0], is(1));
+ assertThat(updateCounts[1], is(1));
+ ResultSet resultSet = statement.getResultSet();
+ assertThat(resultSet, nullValue());
+
+ // Now empty batch
+ statement.clearBatch();
+ updateCounts = statement.executeBatch();
+ assertThat(updateCounts.length, is(0));
+ resultSet = statement.getResultSet();
+ assertThat(resultSet, nullValue());
+
+ connection.close();
}
/**
http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 063f16d..cc3e5fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ limitations under the License.
<!-- This list is in alphabetical order. -->
<airlift-tpch.version>0.1</airlift-tpch.version>
- <avatica.version>1.7.1</avatica.version>
+ <avatica.version>1.8.0</avatica.version>
<build-helper-maven-plugin.version>1.9</build-helper-maven-plugin.version>
<cassandra-driver-core.version>3.0.0</cassandra-driver-core.version>
<checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>