You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/06/05 19:04:50 UTC

[6/6] incubator-calcite git commit: [CALCITE-718] Enable fetch to work for Statement.execute() for Avatica (Xavier Leong)

[CALCITE-718] Enable fetch to work for Statement.execute() for Avatica (Xavier Leong)

Close apache/incubator-calcite#88

Close apache/incubator-calcite#87 (forgot to close it in recent commit)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/ec9d9660
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/ec9d9660
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/ec9d9660

Branch: refs/heads/master
Commit: ec9d966042f870c70593fca9e8bf55f8f74317e9
Parents: e8c3ccc
Author: Xavier Leong <le...@persistent.my>
Authored: Fri May 29 15:06:01 2015 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Jun 5 01:37:57 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 50 +++++++++++---------
 .../calcite/avatica/jdbc/JdbcResultSet.java     | 12 +++--
 .../calcite/avatica/RemoteDriverTest.java       | 41 ++++++++++++++++
 .../calcite/avatica/AvaticaConnection.java      |  2 +-
 .../java/org/apache/calcite/avatica/Meta.java   |  6 +--
 .../calcite/avatica/remote/LocalService.java    |  9 ++--
 .../calcite/avatica/remote/RemoteMeta.java      |  9 ++--
 .../apache/calcite/avatica/remote/Service.java  |  5 +-
 .../apache/calcite/jdbc/CalciteMetaImpl.java    |  5 +-
 9 files changed, 98 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 3fe1c2a..d1a2049 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -714,16 +714,20 @@ public class JdbcMeta implements Meta {
     }
   }
 
-  public ExecuteResult prepareAndExecute(ConnectionHandle ch, String sql,
+  public ExecuteResult prepareAndExecute(StatementHandle h, String sql,
       int maxRowCount, PrepareCallback callback) {
     try {
-      final Connection connection = getConnection(ch.id);
-      final Statement statement = connection.createStatement();
-      final int id = System.identityHashCode(statement);
-      final StatementInfo info = new StatementInfo(statement);
-      statementCache.put(id, info); // TODO: must we retain a statement in all cases?
+      final StatementInfo info = statementCache.getIfPresent(h.id);
+      if (info == null) {
+        throw new RuntimeException("Statement not found, potentially expired. "
+            + h);
+      }
+      final Statement statement = info.statement;
+      // Special handling of maxRowCount as JDBC 0 is unlimited, our meta 0 row
       if (maxRowCount > 0) {
         statement.setMaxRows(maxRowCount);
+      } else if (maxRowCount < 0) {
+        statement.setMaxRows(0);
       }
       boolean ret = statement.execute(sql);
       info.resultSet = statement.getResultSet();
@@ -732,12 +736,14 @@ public class JdbcMeta implements Meta {
       if (info.resultSet == null) {
         // Create a special result set that just carries update count
         resultSets.add(
-            MetaResultSet.count(ch.id, id, statement.getUpdateCount()));
+            MetaResultSet.count(h.connectionId, h.id,
+                statement.getUpdateCount()));
       } else {
-        resultSets.add(JdbcResultSet.create(ch.id, id, info.resultSet, maxRowCount));
+        resultSets.add(
+            JdbcResultSet.create(h.connectionId, h.id, info.resultSet,
+                maxRowCount));
       }
       if (LOG.isTraceEnabled()) {
-        StatementHandle h = new StatementHandle(ch.id, id, null);
         LOG.trace("prepAndExec statement " + h);
       }
       // TODO: review client to ensure statementId is updated when appropriate
@@ -750,26 +756,26 @@ public class JdbcMeta implements Meta {
   public Frame fetch(StatementHandle h, List<TypedValue> parameterValues,
       int offset, int fetchMaxRowCount) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
+      LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:"
+          + fetchMaxRowCount);
     }
     try {
       final StatementInfo statementInfo = Objects.requireNonNull(
           statementCache.getIfPresent(h.id),
           "Statement not found, potentially expired. " + h);
       if (statementInfo.resultSet == null || parameterValues != null) {
-        if (statementInfo.resultSet != null) {
-          statementInfo.resultSet.close();
-        }
-        final PreparedStatement preparedStatement =
-            (PreparedStatement) statementInfo.statement;
-        if (parameterValues != null) {
-          for (int i = 0; i < parameterValues.size(); i++) {
-            TypedValue o = parameterValues.get(i);
-            preparedStatement.setObject(i + 1, o.toJdbc(calendar));
+        if (statementInfo.statement instanceof PreparedStatement) {
+          final PreparedStatement preparedStatement =
+              (PreparedStatement) statementInfo.statement;
+          if (parameterValues != null) {
+            for (int i = 0; i < parameterValues.size(); i++) {
+              TypedValue o = parameterValues.get(i);
+              preparedStatement.setObject(i + 1, o.toJdbc(calendar));
+            }
+          }
+          if (preparedStatement.execute()) {
+            statementInfo.resultSet = preparedStatement.getResultSet();
           }
-        }
-        if (preparedStatement.execute()) {
-          statementInfo.resultSet = preparedStatement.getResultSet();
         }
       }
       if (statementInfo.resultSet == null) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index 3f6678f..ae67b50 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -52,8 +52,12 @@ class JdbcResultSet extends Meta.MetaResultSet {
     try {
       Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData());
       final Calendar calendar = Calendar.getInstance(DateTimeUtils.GMT_ZONE);
-      final Meta.Frame firstFrame = frame(resultSet, 0, maxRowCount, calendar);
-      resultSet.close();
+      final int fetchRowCount =
+        (maxRowCount == -1 || maxRowCount > 100) ? 100 : maxRowCount;
+      final Meta.Frame firstFrame = frame(resultSet, 0, fetchRowCount, calendar);
+      if (firstFrame.done) {
+        resultSet.close();
+      }
       return new JdbcResultSet(connectionId, statementId, true, sig,
           firstFrame);
     } catch (SQLException e) {
@@ -72,10 +76,12 @@ class JdbcResultSet extends Meta.MetaResultSet {
       types[i] = metaData.getColumnType(i + 1);
     }
     final List<Object> rows = new ArrayList<>();
-    boolean done = false;
+    // Meta prepare/prepareAndExecute 0 return 0 row and done
+    boolean done = fetchMaxRowCount == 0 ? true : false;
     for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) {
       if (!resultSet.next()) {
         done = true;
+        resultSet.close();
         break;
       }
       Object[] columns = new Object[columnCount];

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
index 4dd740b..e022ddc 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
@@ -285,6 +285,47 @@ public class RemoteDriverTest {
     checkStatementExecute(ljs(), false);
   }
 
+  @Test public void testStatementExecuteFetch() throws Exception {
+    // Creating a > 100 rows queries to enable fetch request
+    String sql = "select * from emp cross join emp";
+    checkExecuteFetch(ljs(), sql, false, 1);
+    // PreparedStatement needed an extra fetch, as the execute will
+    // trigger the 1st fetch. Where statement execute will execute direct
+    // with results back.
+    checkExecuteFetch(ljs(), sql, true, 2);
+  }
+
+  private void checkExecuteFetch(Connection conn, String sql, boolean isPrepare,
+    int fetchCountMatch) throws SQLException {
+    final Statement exeStatement;
+    final ResultSet results;
+    LoggingLocalJsonService.THREAD_LOG.get().enableAndClear();
+    if (isPrepare) {
+      PreparedStatement statement = conn.prepareStatement(sql);
+      exeStatement = statement;
+      results = statement.executeQuery();
+    } else {
+      Statement statement = conn.createStatement();
+      exeStatement = statement;
+      results = statement.executeQuery(sql);
+    }
+    int count = 0;
+    int fetchCount = 0;
+    while (results.next()) {
+      count++;
+    }
+    results.close();
+    exeStatement.close();
+    List<String[]> x = LoggingLocalJsonService.THREAD_LOG.get().getAndDisable();
+    for (String[] pair : x) {
+      if (pair[0].contains("\"request\":\"fetch")) {
+        fetchCount++;
+      }
+    }
+    assertEquals(count, 196);
+    assertEquals(fetchCount, fetchCountMatch);
+  }
+
   @Test public void testStatementExecuteLocalMaxRow() throws Exception {
     checkStatementExecute(ljs(), false, 2);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 1a6ea44..1ed561c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -474,7 +474,7 @@ public abstract class AvaticaConnection implements Connection {
             }
           }
         };
-    return meta.prepareAndExecute(handle, sql, maxRowCount, callback);
+    return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback);
   }
 
   protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index e98c7f1..5dd59f0 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -168,15 +168,15 @@ public interface Meta {
 
   /** Prepares and executes a statement.
    *
-   * @param ch Connection handle
+   * @param h Statement handle
    * @param sql SQL query
    * @param maxRowCount Negative for no limit (different meaning than JDBC)
    * @param callback Callback to lock, clear and assign cursor
    *
    * @return Result containing statement ID, and if a query, a result set and
-   * first frame of data
+   *     first frame of data
    */
-  ExecuteResult prepareAndExecute(ConnectionHandle ch, String sql,
+  ExecuteResult prepareAndExecute(StatementHandle h, String sql,
       int maxRowCount, PrepareCallback callback);
 
   /** Returns a frame of rows.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index ae4c9d0..f03d55b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -67,7 +67,6 @@ public class LocalService implements Service {
       }
     } else {
       //noinspection unchecked
-      list = (List<Object>) (List) list2(resultSet);
       cursorFactory = Meta.CursorFactory.LIST;
     }
     Meta.Signature signature = resultSet.signature;
@@ -75,7 +74,7 @@ public class LocalService implements Service {
       signature = signature.setCursorFactory(cursorFactory);
     }
     return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
-        resultSet.ownStatement, signature, new Meta.Frame(0, true, list), -1);
+        resultSet.ownStatement, signature, resultSet.firstFrame, -1);
   }
 
   private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
@@ -136,10 +135,10 @@ public class LocalService implements Service {
   }
 
   public ExecuteResponse apply(PrepareAndExecuteRequest request) {
-    final Meta.ConnectionHandle ch =
-        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.StatementHandle sh =
+        new Meta.StatementHandle(request.connectionId, request.statementId, null);
     final Meta.ExecuteResult executeResult =
-        meta.prepareAndExecute(ch, request.sql, request.maxRowCount,
+        meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
             new Meta.PrepareCallback() {
               @Override public Object getMonitor() {
                 return LocalService.class;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 456cf08..f0abe2d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -165,15 +165,18 @@ class RemoteMeta extends MetaImpl {
     return response.statement;
   }
 
-  @Override public ExecuteResult prepareAndExecute(ConnectionHandle ch,
+  @Override public ExecuteResult prepareAndExecute(StatementHandle h,
       String sql, int maxRowCount, PrepareCallback callback) {
-    connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection state if necessary
+    // sync connection state if necessary
+    connectionSync(new ConnectionHandle(h.connectionId),
+      new ConnectionPropertiesImpl());
     final Service.ExecuteResponse response;
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
         response = service.apply(
-            new Service.PrepareAndExecuteRequest(ch.id, sql, maxRowCount));
+            new Service.PrepareAndExecuteRequest(h.connectionId,
+              h.id, sql, maxRowCount));
         if (response.results.size() > 0) {
           final Service.ResultSetResponse result = response.results.get(0);
           callback.assign(result.signature, result.firstFrame,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index 951b34e..9a8b5da 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -241,18 +241,21 @@ public interface Service {
   }
 
   /** Request for
-   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
+   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(Meta.StatementHandle, String, int, Meta.PrepareCallback)}. */
   class PrepareAndExecuteRequest extends Request {
     public final String connectionId;
     public final String sql;
     public final int maxRowCount;
+    public final int statementId;
 
     @JsonCreator
     public PrepareAndExecuteRequest(
         @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId,
         @JsonProperty("sql") String sql,
         @JsonProperty("maxRowCount") int maxRowCount) {
       this.connectionId = connectionId;
+      this.statementId = statementId;
       this.sql = sql;
       this.maxRowCount = maxRowCount;
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/ec9d9660/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index c466819..332ed40 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -548,11 +548,9 @@ public class CalciteMetaImpl extends MetaImpl {
     return h;
   }
 
-  @Override public ExecuteResult prepareAndExecute(ConnectionHandle ch,
+  @Override public ExecuteResult prepareAndExecute(StatementHandle h,
       String sql, int maxRowCount, PrepareCallback callback) {
     final CalcitePrepare.CalciteSignature<Object> signature;
-    final StatementHandle h = createStatement(ch);
-
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
@@ -561,6 +559,7 @@ public class CalciteMetaImpl extends MetaImpl {
             calciteConnection.server.getStatement(h);
         signature = calciteConnection.parseQuery(sql,
             statement.createPrepareContext(), maxRowCount);
+        statement.setSignature(signature);
         callback.assign(signature, null, -1);
       }
       callback.execute();