You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/06/05 22:22:51 UTC

[10/12] calcite git commit: [CALCITE-1239] Upgrade to avatica-1.8.0

[CALCITE-1239] Upgrade to avatica-1.8.0


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/eb02ac3f
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/eb02ac3f
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/eb02ac3f

Branch: refs/heads/master
Commit: eb02ac3fcef9deb2b16668f5dc06c9a89c0e658e
Parents: e25ceef
Author: Julian Hyde <jh...@apache.org>
Authored: Mon May 23 08:47:30 2016 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Jun 4 21:35:29 2016 -0700

----------------------------------------------------------------------
 .../config/CalciteConnectionProperty.java       |  4 +-
 .../calcite/jdbc/CalciteConnectionImpl.java     | 10 ++-
 .../apache/calcite/jdbc/CalciteMetaImpl.java    | 94 ++++++++++++++++++--
 .../apache/calcite/jdbc/CalciteStatement.java   | 17 +++-
 .../apache/calcite/server/CalciteServer.java    | 10 ++-
 .../calcite/jdbc/CalciteRemoteDriverTest.java   | 32 +++++++
 pom.xml                                         |  2 +-
 7 files changed, 154 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
index 7bf157d..b63083c 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
@@ -18,7 +18,6 @@ package org.apache.calcite.config;
 
 import org.apache.calcite.avatica.ConnectionProperty;
 import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.util.Bug;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -48,8 +47,7 @@ public enum CalciteConnectionProperty implements ConnectionProperty {
 
   /** How many rows the Druid adapter should fetch at a time when executing
    * "select" queries. */
-  DRUID_FETCH("druidFetch", Type.STRING, "16384",
-      Bug.upgrade("convert to Type.NUMBER after [CALCITE-1207]")),
+  DRUID_FETCH("druidFetch", Type.NUMBER, 16384, false),
 
   /** URI of the model. */
   MODEL("model", Type.STRING, null, false),

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
index 4319fe0..61a8d28 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
@@ -26,6 +26,7 @@ import org.apache.calcite.avatica.Helper;
 import org.apache.calcite.avatica.InternalProperty;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
 import org.apache.calcite.avatica.UnregisteredDriver;
 import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.calcite.config.CalciteConnectionConfig;
@@ -324,8 +325,13 @@ abstract class CalciteConnectionImpl
       statementMap.put(h.id, new CalciteServerStatementImpl(c));
     }
 
-    public CalciteServerStatement getStatement(Meta.StatementHandle h) {
-      return statementMap.get(h.id);
+    public CalciteServerStatement getStatement(Meta.StatementHandle h)
+        throws NoSuchStatementException {
+      CalciteServerStatement statement = statementMap.get(h.id);
+      if (statement == null) {
+        throw new NoSuchStatementException(h);
+      }
+      return statement;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index fec7346..434b1d4 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -57,6 +57,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
 
 import java.lang.reflect.Field;
 import java.sql.Connection;
@@ -151,7 +153,13 @@ public class CalciteMetaImpl extends MetaImpl {
 
   @Override public void closeStatement(StatementHandle h) {
     final CalciteConnectionImpl calciteConnection = getConnection();
-    CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
+    final CalciteServerStatement stmt;
+    try {
+      stmt = calciteConnection.server.getStatement(h);
+    } catch (NoSuchStatementException e) {
+      // statement is not valid; nothing to do
+      return;
+    }
     // stmt.close(); // TODO: implement
     calciteConnection.server.removeStatement(h);
   }
@@ -553,7 +561,13 @@ public class CalciteMetaImpl extends MetaImpl {
     final StatementHandle h = createStatement(ch);
     final CalciteConnectionImpl calciteConnection = getConnection();
 
-    CalciteServerStatement statement = calciteConnection.server.getStatement(h);
+    final CalciteServerStatement statement;
+    try {
+      statement = calciteConnection.server.getStatement(h);
+    } catch (NoSuchStatementException e) {
+      // Not possible. We just created a statement.
+      throw new AssertionError("missing statement", e);
+    }
     h.signature =
         calciteConnection.parseQuery(CalcitePrepare.Query.of(sql),
             statement.createPrepareContext(), maxRowCount);
@@ -562,7 +576,14 @@ public class CalciteMetaImpl extends MetaImpl {
   }
 
   @Override public ExecuteResult prepareAndExecute(StatementHandle h,
-      String sql, long maxRowCount, PrepareCallback callback) {
+      String sql, long maxRowCount, PrepareCallback callback)
+      throws NoSuchStatementException {
+    return prepareAndExecute(h, sql, maxRowCount, -1, callback);
+  }
+
+  @Override public ExecuteResult prepareAndExecute(StatementHandle h,
+      String sql, long maxRowCount, int maxRowsInFirstFrame,
+      PrepareCallback callback) throws NoSuchStatementException {
     final CalcitePrepare.CalciteSignature<Object> signature;
     try {
       synchronized (callback.getMonitor()) {
@@ -585,7 +606,8 @@ public class CalciteMetaImpl extends MetaImpl {
     // TODO: share code with prepare and createIterable
   }
 
-  @Override public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) {
+  @Override public Frame fetch(StatementHandle h, long offset,
+      int fetchMaxRowCount) throws NoSuchStatementException {
     final CalciteConnectionImpl calciteConnection = getConnection();
     CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
     final Signature signature = stmt.getSignature();
@@ -608,7 +630,14 @@ public class CalciteMetaImpl extends MetaImpl {
   }
 
   @Override public ExecuteResult execute(StatementHandle h,
-      List<TypedValue> parameterValues, long maxRowCount) {
+      List<TypedValue> parameterValues, long maxRowCount)
+      throws NoSuchStatementException {
+    return execute(h, parameterValues, Ints.saturatedCast(maxRowCount));
+  }
+
+  @Override public ExecuteResult execute(StatementHandle h,
+      List<TypedValue> parameterValues, int maxRowsInFirstFrame)
+      throws NoSuchStatementException {
     final CalciteConnectionImpl calciteConnection = getConnection();
     CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
     final Signature signature = stmt.getSignature();
@@ -633,6 +662,61 @@ public class CalciteMetaImpl extends MetaImpl {
     return new ExecuteResult(ImmutableList.of(metaResultSet));
   }
 
+  @Override public ExecuteBatchResult executeBatch(StatementHandle h,
+      List<List<TypedValue>> parameterValueLists) throws NoSuchStatementException {
+    final List<Long> updateCounts = new ArrayList<>();
+    for (List<TypedValue> parameterValueList : parameterValueLists) {
+      ExecuteResult executeResult = execute(h, parameterValueList, -1);
+      final long updateCount =
+          executeResult.resultSets.size() == 1
+              ? executeResult.resultSets.get(0).updateCount
+              : -1L;
+      updateCounts.add(updateCount);
+    }
+    return new ExecuteBatchResult(Longs.toArray(updateCounts));
+  }
+
+  @Override public ExecuteBatchResult prepareAndExecuteBatch(
+      final StatementHandle h,
+      List<String> sqlCommands) throws NoSuchStatementException {
+    final CalciteConnectionImpl calciteConnection = getConnection();
+    final CalciteServerStatement statement =
+        calciteConnection.server.getStatement(h);
+    final List<Long> updateCounts = new ArrayList<>();
+    final Meta.PrepareCallback callback =
+        new Meta.PrepareCallback() {
+          long updateCount;
+          Signature signature;
+
+          public Object getMonitor() {
+            return statement;
+          }
+
+          public void clear() throws SQLException {}
+
+          public void assign(Meta.Signature signature, Meta.Frame firstFrame,
+              long updateCount) throws SQLException {
+            this.signature = signature;
+            this.updateCount = updateCount;
+          }
+
+          public void execute() throws SQLException {
+            if (signature.statementType.canUpdate()) {
+              final Iterable<Object> iterable =
+                  _createIterable(h, signature, ImmutableList.<TypedValue>of(),
+                      null);
+              final Iterator<Object> iterator = iterable.iterator();
+              updateCount = ((Number) iterator.next()).longValue();
+            }
+            updateCounts.add(updateCount);
+          }
+        };
+    for (String sqlCommand : sqlCommands) {
+      Util.discard(prepareAndExecute(h, sqlCommand, -1L, -1, callback));
+    }
+    return new ExecuteBatchResult(Longs.toArray(updateCounts));
+  }
+
   /** A trojan-horse method, subject to change without notice. */
   @VisibleForTesting
   public static DataContext createDataContext(CalciteConnection connection) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
index 298b56c..2cb70e8 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
@@ -19,6 +19,7 @@ package org.apache.calcite.jdbc;
 import org.apache.calcite.avatica.AvaticaResultSet;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
 import org.apache.calcite.linq4j.Queryable;
 import org.apache.calcite.server.CalciteServerStatement;
 
@@ -48,7 +49,13 @@ public abstract class CalciteStatement extends AvaticaStatement {
 
   @Override public <T> T unwrap(Class<T> iface) throws SQLException {
     if (iface == CalciteServerStatement.class) {
-      return iface.cast(getConnection().server.getStatement(handle));
+      final CalciteServerStatement statement;
+      try {
+        statement = getConnection().server.getStatement(handle);
+      } catch (NoSuchStatementException e) {
+        throw new AssertionError("invalid statement", e);
+      }
+      return iface.cast(statement);
     }
     return super.unwrap(iface);
   }
@@ -65,8 +72,12 @@ public abstract class CalciteStatement extends AvaticaStatement {
       Queryable<T> queryable) {
     final CalciteConnectionImpl calciteConnection = getConnection();
     final CalcitePrepare prepare = calciteConnection.prepareFactory.apply();
-    final CalciteServerStatement serverStatement =
-        calciteConnection.server.getStatement(handle);
+    final CalciteServerStatement serverStatement;
+    try {
+      serverStatement = calciteConnection.server.getStatement(handle);
+    } catch (NoSuchStatementException e) {
+      throw new AssertionError("invalid statement", e);
+    }
     final CalcitePrepare.Context prepareContext =
         serverStatement.createPrepareContext();
     return prepare.prepareQueryable(prepareContext, queryable);

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/main/java/org/apache/calcite/server/CalciteServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/server/CalciteServer.java b/core/src/main/java/org/apache/calcite/server/CalciteServer.java
index b5e8b8e..9d769db 100644
--- a/core/src/main/java/org/apache/calcite/server/CalciteServer.java
+++ b/core/src/main/java/org/apache/calcite/server/CalciteServer.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.server;
 
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
 import org.apache.calcite.jdbc.CalciteConnection;
 
 /**
@@ -30,7 +31,14 @@ public interface CalciteServer {
 
   void addStatement(CalciteConnection connection, Meta.StatementHandle h);
 
-  CalciteServerStatement getStatement(Meta.StatementHandle h);
+  /** Returns the statement with a given handle.
+   *
+   * @param h Statement handle
+   * @return Statement, never null
+   * @throws NoSuchStatementException if handle does not represent a statement
+   */
+  CalciteServerStatement getStatement(Meta.StatementHandle h)
+      throws NoSuchStatementException;
 }
 
 // End CalciteServer.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
index 7ce1ac4..18ab7bd 100644
--- a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
+++ b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.hamcrest.CoreMatchers;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -795,6 +796,37 @@ public class CalciteRemoteDriverTest {
     assertThat(resultSet, nullValue());
     int updateCount = statement.getUpdateCount();
     assertThat(updateCount, is(1));
+    connection.close();
+  }
+
+  /** Test remote Statement batched insert. */
+  @Test public void testInsertBatch() throws Exception {
+    final Connection connection = DriverManager.getConnection(
+        "jdbc:avatica:remote:factory="
+            + LocalServiceModifiableFactory.class.getName());
+    assertThat(connection.getMetaData().supportsBatchUpdates(), is(true));
+    assertThat(connection.isClosed(), is(false));
+    Statement statement = connection.createStatement();
+    assertThat(statement.isClosed(), is(false));
+
+    String sql = "insert into \"foo\".\"bar\" values (1, 1, 'second', 2, 2)";
+    statement.addBatch(sql);
+    statement.addBatch(sql);
+    int[] updateCounts = statement.executeBatch();
+    assertThat(updateCounts.length, is(2));
+    assertThat(updateCounts[0], is(1));
+    assertThat(updateCounts[1], is(1));
+    ResultSet resultSet = statement.getResultSet();
+    assertThat(resultSet, nullValue());
+
+    // Now empty batch
+    statement.clearBatch();
+    updateCounts = statement.executeBatch();
+    assertThat(updateCounts.length, is(0));
+    resultSet = statement.getResultSet();
+    assertThat(resultSet, nullValue());
+
+    connection.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/eb02ac3f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 063f16d..cc3e5fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ limitations under the License.
 
     <!-- This list is in alphabetical order. -->
     <airlift-tpch.version>0.1</airlift-tpch.version>
-    <avatica.version>1.7.1</avatica.version>
+    <avatica.version>1.8.0</avatica.version>
     <build-helper-maven-plugin.version>1.9</build-helper-maven-plugin.version>
     <cassandra-driver-core.version>3.0.0</cassandra-driver-core.version>
     <checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>