You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/04/03 15:02:42 UTC
[09/10] incubator-calcite git commit: [CALCITE-646] AvaticaStatement
execute method broken over remote JDBC (Yeong Wei and Julian Hyde)
[CALCITE-646] AvaticaStatement execute method broken over remote JDBC (Yeong Wei and Julian Hyde)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0d80fd25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0d80fd25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0d80fd25
Branch: refs/heads/master
Commit: 0d80fd25315bc53f71622415c8bb3d364e634507
Parents: 272e604
Author: Julian Hyde <jh...@apache.org>
Authored: Thu Apr 2 23:50:27 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Apr 3 01:10:06 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 28 ++---
.../calcite/avatica/jdbc/JdbcResultSet.java | 2 +-
.../calcite/avatica/RemoteDriverTest.java | 105 +++++++++++++++----
.../calcite/avatica/AvaticaConnection.java | 28 ++---
.../calcite/avatica/AvaticaStatement.java | 79 +++++++-------
.../java/org/apache/calcite/avatica/Meta.java | 50 +++++++--
.../org/apache/calcite/avatica/MetaImpl.java | 2 +-
.../calcite/avatica/remote/JsonService.java | 47 ++++++---
.../calcite/avatica/remote/LocalService.java | 19 +++-
.../calcite/avatica/remote/MockJsonService.java | 8 +-
.../calcite/avatica/remote/RemoteMeta.java | 23 +++-
.../apache/calcite/avatica/remote/Service.java | 28 ++++-
.../apache/calcite/jdbc/CalciteMetaImpl.java | 16 +--
.../calcite/jdbc/CalciteRemoteDriverTest.java | 17 +++
14 files changed, 311 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 698d7a7..f9d783d 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -88,13 +88,6 @@ public class JdbcMeta implements Meta {
SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
}
- /** A "magic column" descriptor used to return statement execute update count via a
- * {@link Meta.MetaResultSet}. */
- private static final ColumnMetaData UPDATE_COL = new ColumnMetaData(1, false, false, false,
- false, ResultSetMetaData.columnNoNulls, true, 10, "u", "u", null, 15, 15, "update_result",
- "avatica_internal", ColumnMetaData.scalar(Types.INTEGER, "INTEGER",
- ColumnMetaData.Rep.INTEGER), true, false, false, "java.lang.Integer");
-
private static final String CONN_CACHE_KEY_BASE = "avatica.connectioncache";
/** Configurable connection cache settings. */
@@ -751,7 +744,7 @@ public class JdbcMeta implements Meta {
}
}
- public MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
+ public ExecuteResult prepareAndExecute(ConnectionHandle ch, String sql,
int maxRowCount, PrepareCallback callback) {
try {
final Connection connection = getConnection(ch.id);
@@ -762,27 +755,20 @@ public class JdbcMeta implements Meta {
boolean ret = statement.execute();
info.resultSet = statement.getResultSet();
assert ret || info.resultSet == null;
- final MetaResultSet mrs;
+ final List<MetaResultSet> resultSets = new ArrayList<>();
if (info.resultSet == null) {
- // build a non-JDBC result that contains the update count
- int updateCount = statement.getUpdateCount();
- List<ColumnMetaData> columns = new ArrayList<>(1);
- columns.add(UPDATE_COL);
- List<Object> val = new ArrayList<>();
- val.add(new Object[] { updateCount });
- final Signature signature =
- new Signature(columns, sql, null, null, CursorFactory.ARRAY);
- final Frame frame = new Frame(0, true, val);
- mrs = new MetaResultSet(ch.id, id, true, signature, frame);
+ // Create a special result set that just carries update count
+ resultSets.add(
+ MetaResultSet.count(ch.id, id, statement.getUpdateCount()));
} else {
- mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
+ resultSets.add(JdbcResultSet.create(ch.id, id, info.resultSet));
}
if (LOG.isTraceEnabled()) {
StatementHandle h = new StatementHandle(ch.id, id, null);
LOG.trace("prepAndExec statement " + h);
}
// TODO: review client to ensure statementId is updated when appropriate
- return mrs;
+ return new ExecuteResult(resultSets);
} catch (SQLException e) {
throw propagate(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index 407b91e..bcc2745 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -36,7 +36,7 @@ import java.util.List;
class JdbcResultSet extends Meta.MetaResultSet {
protected JdbcResultSet(String connectionId, int statementId,
boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame) {
- super(connectionId, statementId, ownStatement, signature, firstFrame);
+ super(connectionId, statementId, ownStatement, signature, firstFrame, -1);
}
/** Creates a result set. */
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
index 1a5316c..f23e2fc 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
@@ -45,6 +45,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -202,16 +203,95 @@ public class RemoteDriverTest {
final String insert = "insert into TEST_TABLE values(1, 'foo')";
final String update = "update TEST_TABLE set msg='bar' where id=1";
try (Connection connection = ljs();
- Statement statement = connection.createStatement()) {
- boolean ret;
+ Statement statement = connection.createStatement();
+ PreparedStatement pstmt = connection.prepareStatement("values 1")) {
+ // drop
assertFalse(statement.execute(drop));
assertEquals(0, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+ try {
+ final ResultSet rs = statement.executeQuery(drop);
+ fail("expected error, got " + rs);
+ } catch (SQLException e) {
+ assertThat(e.getMessage(),
+ equalTo("Statement did not return a result set"));
+ }
+ assertEquals(0, statement.executeUpdate(drop));
+ assertEquals(0, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+
+ // create
assertFalse(statement.execute(create));
assertEquals(0, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+ assertFalse(statement.execute(drop)); // tidy up
+ try {
+ final ResultSet rs = statement.executeQuery(create);
+ fail("expected error, got " + rs);
+ } catch (SQLException e) {
+ assertThat(e.getMessage(),
+ equalTo("Statement did not return a result set"));
+ }
+ assertFalse(statement.execute(drop)); // tidy up
+ assertEquals(0, statement.executeUpdate(create));
+ assertEquals(0, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+
+ // insert
assertFalse(statement.execute(insert));
assertEquals(1, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+ try {
+ final ResultSet rs = statement.executeQuery(insert);
+ fail("expected error, got " + rs);
+ } catch (SQLException e) {
+ assertThat(e.getMessage(),
+ equalTo("Statement did not return a result set"));
+ }
+ assertEquals(1, statement.executeUpdate(insert));
+ assertEquals(1, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+
+ // update
assertFalse(statement.execute(update));
- assertEquals(0, statement.executeUpdate(drop));
+ assertEquals(3, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+ try {
+ final ResultSet rs = statement.executeQuery(update);
+ fail("expected error, got " + rs);
+ } catch (SQLException e) {
+ assertThat(e.getMessage(),
+ equalTo("Statement did not return a result set"));
+ }
+ assertEquals(3, statement.executeUpdate(update));
+ assertEquals(3, statement.getUpdateCount());
+ assertNull(statement.getResultSet());
+
+ final String[] messages = {
+ "Cannot call executeQuery(String) on prepared or callable statement",
+ "Cannot call execute(String) on prepared or callable statement",
+ "Cannot call executeUpdate(String) on prepared or callable statement",
+ };
+ for (String sql : new String[]{drop, create, insert, update}) {
+ for (int i = 0; i <= 2; i++) {
+ try {
+ Object o;
+ switch (i) {
+ case 0:
+ o = pstmt.executeQuery(sql);
+ break;
+ case 1:
+ o = pstmt.execute(sql);
+ break;
+ default:
+ o = pstmt.executeUpdate(sql);
+ }
+ fail("expected error, got " + o);
+ } catch (SQLException e) {
+ assertThat(e.getMessage(), equalTo(messages[i]));
+ }
+ }
+ }
}
}
@@ -294,25 +374,6 @@ public class RemoteDriverTest {
0, connectionMap.size());
}
- private void checkStatementExecuteQuery(Connection connection)
- throws SQLException {
- final Statement statement = connection.createStatement();
- final ResultSet resultSet =
- statement.executeQuery("select * from (\n"
- + " values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)");
- final ResultSetMetaData metaData = resultSet.getMetaData();
- assertEquals(2, metaData.getColumnCount());
- assertEquals("C1", metaData.getColumnName(1));
- assertEquals("C2", metaData.getColumnName(2));
- assertTrue(resultSet.next());
- assertTrue(resultSet.next());
- assertTrue(resultSet.next());
- assertFalse(resultSet.next());
- resultSet.close();
- statement.close();
- connection.close();
- }
-
@Test public void testPrepareBindExecuteFetch() throws Exception {
checkPrepareBindExecuteFetch(ljs());
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 2c66d1e..f6bb25a 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -432,10 +432,10 @@ public abstract class AvaticaConnection implements Connection {
return statement.openResultSet;
}
- protected ResultSet prepareAndExecuteInternal(
+ protected Meta.ExecuteResult prepareAndExecuteInternal(
final AvaticaStatement statement, String sql, int maxRowCount)
throws SQLException {
- Meta.MetaResultSet x = meta.prepareAndExecute(handle, sql, maxRowCount,
+ final Meta.PrepareCallback callback =
new Meta.PrepareCallback() {
public Object getMonitor() {
return statement;
@@ -454,20 +454,24 @@ public abstract class AvaticaConnection implements Connection {
}
}
- public void assign(Meta.Signature signature, Meta.Frame firstFrame)
- throws SQLException {
- final TimeZone timeZone = getTimeZone();
- statement.openResultSet =
- factory.newResultSet(statement, signature, timeZone,
- firstFrame);
+ public void assign(Meta.Signature signature, Meta.Frame firstFrame,
+ int updateCount) throws SQLException {
+ if (updateCount != -1) {
+ statement.updateCount = updateCount;
+ } else {
+ final TimeZone timeZone = getTimeZone();
+ statement.openResultSet = factory.newResultSet(statement,
+ signature, timeZone, firstFrame);
+ }
}
public void execute() throws SQLException {
- statement.openResultSet.execute();
+ if (statement.openResultSet != null) {
+ statement.openResultSet.execute();
+ }
}
- });
- assert statement.openResultSet != null;
- return statement.openResultSet;
+ };
+ return meta.prepareAndExecute(handle, sql, maxRowCount, callback);
}
protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 1a7e5ec..8deb643 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -16,8 +16,9 @@
*/
package org.apache.calcite.avatica;
+import java.sql.CallableStatement;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
@@ -49,6 +50,9 @@ public abstract class AvaticaStatement
*/
protected AvaticaResultSet openResultSet;
+ /** Current update count. Same lifecycle as {@link #openResultSet}. */
+ protected int updateCount;
+
private int queryTimeoutMillis;
final int resultSetType;
final int resultSetConcurrency;
@@ -56,7 +60,6 @@ public abstract class AvaticaStatement
private int fetchSize;
private int fetchDirection;
protected int maxRowCount = 0;
- private int updateCount = -1;
/**
* Creates an AvaticaStatement.
@@ -88,37 +91,47 @@ public abstract class AvaticaStatement
return handle.id;
}
- // implement Statement
-
- public boolean execute(String sql) throws SQLException {
- ResultSet resultSet = executeQuery(sql);
- ResultSetMetaData md = resultSet.getMetaData();
- // hackish, but be sure we're looking at an update count result, not user data
- if (md.getCatalogName(1).equalsIgnoreCase("avatica_internal")
- && md.getTableName(1).equalsIgnoreCase("update_result")
- && md.getColumnCount() == 1
- && md.getColumnName(1).equalsIgnoreCase("u")) {
- if (!resultSet.next()) {
- throw new SQLException("expected one row, got zero");
- }
- this.updateCount = resultSet.getInt(1);
- if (resultSet.next()) {
- throw new SQLException("expected one row, got two or more");
- }
- resultSet.close();
- return false;
- } else {
- return !resultSet.isClosed();
+ private void checkNotPreparedOrCallable(String s) throws SQLException {
+ if (this instanceof PreparedStatement
+ || this instanceof CallableStatement) {
+ throw connection.helper.createException("Cannot call " + s
+ + " on prepared or callable statement");
}
}
- public ResultSet executeQuery(String sql) throws SQLException {
+ protected void executeInternal(String sql) throws SQLException {
// reset previous state before moving forward.
this.updateCount = -1;
try {
// In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0
final int maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount;
- return connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+ Meta.ExecuteResult x =
+ connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+ } catch (RuntimeException e) {
+ throw connection.helper.createException(
+ "error while executing SQL \"" + sql + "\": " + e.getMessage(), e);
+ }
+ }
+
+ // implement Statement
+
+ public boolean execute(String sql) throws SQLException {
+ checkNotPreparedOrCallable("execute(String)");
+ executeInternal(sql);
+ // Result set is null for DML or DDL.
+ // Result set is closed if user cancelled the query.
+ return openResultSet != null && !openResultSet.isClosed();
+ }
+
+ public ResultSet executeQuery(String sql) throws SQLException {
+ checkNotPreparedOrCallable("executeQuery(String)");
+ try {
+ executeInternal(sql);
+ if (openResultSet == null) {
+ throw connection.helper.createException(
+ "Statement did not return a result set");
+ }
+ return openResultSet;
} catch (RuntimeException e) {
throw connection.helper.createException(
"error while executing SQL \"" + sql + "\": " + e.getMessage(), e);
@@ -126,19 +139,9 @@ public abstract class AvaticaStatement
}
public int executeUpdate(String sql) throws SQLException {
- ResultSet resultSet = executeQuery(sql);
- if (resultSet.getMetaData().getColumnCount() != 1) {
- throw new SQLException("expected one result column");
- }
- if (!resultSet.next()) {
- throw new SQLException("expected one row, got zero");
- }
- this.updateCount = resultSet.getInt(1);
- if (resultSet.next()) {
- throw new SQLException("expected one row, got two or more");
- }
- resultSet.close();
- return this.updateCount;
+ checkNotPreparedOrCallable("executeUpdate(String)");
+ executeInternal(sql);
+ return updateCount;
}
public synchronized void close() throws SQLException {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index 49fc3a5..cadcd17 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -168,9 +168,11 @@ public interface Meta {
* @param sql SQL query
* @param maxRowCount Negative for no limit (different meaning than JDBC)
* @param callback Callback to lock, clear and assign cursor
- * @return MetaResultSet containing statement ID and first frame of data
+ *
+ * @return Result containing statement ID, and if a query, a result set and
+ * first frame of data
*/
- MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
+ ExecuteResult prepareAndExecute(ConnectionHandle ch, String sql,
int maxRowCount, PrepareCallback callback);
/** Returns a frame of rows.
@@ -234,21 +236,53 @@ public interface Meta {
}
}
- /** Meta data from which a result set can be constructed. */
+ /** Response from execute.
+ *
+ * <p>Typically a query will have a result set and rowCount = -1;
+ * a DML statement will have a rowCount and no result sets.
+ */
+ class ExecuteResult {
+ public final List<MetaResultSet> resultSets;
+
+ public ExecuteResult(List<MetaResultSet> resultSets) {
+ this.resultSets = resultSets;
+ }
+ }
+
+ /** Meta data from which a result set can be constructed.
+ *
+ * <p>If {@code updateCount} is not -1, the result is just a count. A result
+ * set cannot be constructed. */
class MetaResultSet {
public final String connectionId;
public final int statementId;
public final boolean ownStatement;
public final Frame firstFrame;
public final Signature signature;
+ public final int updateCount;
- public MetaResultSet(String connectionId, int statementId,
- boolean ownStatement, Signature signature, Frame firstFrame) {
- this.signature = Objects.requireNonNull(signature);
+ protected MetaResultSet(String connectionId, int statementId,
+ boolean ownStatement, Signature signature, Frame firstFrame,
+ int updateCount) {
+ this.signature = signature;
this.connectionId = connectionId;
this.statementId = statementId;
this.ownStatement = ownStatement;
- this.firstFrame = firstFrame; // may be null
+ this.firstFrame = firstFrame; // may be null even if signature is not null
+ this.updateCount = updateCount;
+ }
+
+ public static MetaResultSet create(String connectionId, int statementId,
+ boolean ownStatement, Signature signature, Frame firstFrame) {
+ return new MetaResultSet(connectionId, statementId, ownStatement,
+ Objects.requireNonNull(signature), firstFrame, -1);
+ }
+
+ public static MetaResultSet count(String connectionId, int statementId,
+ int updateCount) {
+ assert updateCount >= 0;
+ return new MetaResultSet(connectionId, statementId, false, null, null,
+ updateCount);
}
}
@@ -555,7 +589,7 @@ public interface Meta {
interface PrepareCallback {
Object getMonitor();
void clear() throws SQLException;
- void assign(Signature signature, Frame firstFrame)
+ void assign(Signature signature, Frame firstFrame, int updateCount)
throws SQLException;
void execute() throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index b8820e8..aa21194 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -243,7 +243,7 @@ public abstract class MetaImpl implements Meta {
final Signature signature =
new Signature(columns, "", Collections.<AvaticaParameter>emptyList(),
internalParameters, cursorFactory);
- return new MetaResultSet(connection.id, statement.getId(), true,
+ return MetaResultSet.create(connection.id, statement.getId(), true,
signature, firstFrame);
} catch (SQLException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 2dd349c..d8e22e6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -46,8 +46,11 @@ public abstract class JsonService implements Service {
* responses to and from the peer service. */
public abstract String apply(String request);
- /** Modifies a signature, changing the type of columns within it. */
- private static Meta.Signature fangle(Meta.Signature signature) {
+ /** Modifies a signature, changing the representation of numeric columns
+ * within it. This deals with the fact that JSON transmits a small long value,
+ * or a float which is a whole number, as an integer. Thus the accessors need
+ * be prepared to accept any numeric type. */
+ private static Meta.Signature finagle(Meta.Signature signature) {
final List<ColumnMetaData> columns = new ArrayList<>();
int changeCount = 0;
for (ColumnMetaData column : signature.columns) {
@@ -77,29 +80,49 @@ public abstract class JsonService implements Service {
signature.cursorFactory);
}
- private static PrepareResponse fangle(PrepareResponse response) {
- final Meta.StatementHandle statement = fangle(response.statement);
+ private static PrepareResponse finagle(PrepareResponse response) {
+ final Meta.StatementHandle statement = finagle(response.statement);
if (statement == response.statement) {
return response;
}
return new PrepareResponse(statement);
}
- private static Meta.StatementHandle fangle(Meta.StatementHandle h) {
- final Meta.Signature signature = fangle(h.signature);
+ private static Meta.StatementHandle finagle(Meta.StatementHandle h) {
+ final Meta.Signature signature = finagle(h.signature);
if (signature == h.signature) {
return h;
}
return new Meta.StatementHandle(h.connectionId, h.id, signature);
}
- private static ResultSetResponse fangle(ResultSetResponse r) {
- final Meta.Signature signature = fangle(r.signature);
+ private static ResultSetResponse finagle(ResultSetResponse r) {
+ if (r.updateCount != -1) {
+ assert r.signature == null;
+ return r;
+ }
+ final Meta.Signature signature = finagle(r.signature);
if (signature == r.signature) {
return r;
}
return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
- signature, r.firstFrame);
+ signature, r.firstFrame, r.updateCount);
+ }
+
+ private static ExecuteResponse finagle(ExecuteResponse r) {
+ final List<ResultSetResponse> results = new ArrayList<>();
+ int changeCount = 0;
+ for (ResultSetResponse result : r.results) {
+ ResultSetResponse result2 = finagle(result);
+ if (result2 != result) {
+ ++changeCount;
+ }
+ results.add(result2);
+ }
+ if (changeCount == 0) {
+ return r;
+ }
+ return new ExecuteResponse(results);
}
//@VisibleForTesting
@@ -161,15 +184,15 @@ public abstract class JsonService implements Service {
public PrepareResponse apply(PrepareRequest request) {
try {
- return fangle(decode(apply(encode(request)), PrepareResponse.class));
+ return finagle(decode(apply(encode(request)), PrepareResponse.class));
} catch (IOException e) {
throw handle(e);
}
}
- public ResultSetResponse apply(PrepareAndExecuteRequest request) {
+ public ExecuteResponse apply(PrepareAndExecuteRequest request) {
try {
- return fangle(decode(apply(encode(request)), ResultSetResponse.class));
+ return finagle(decode(apply(encode(request)), ExecuteResponse.class));
} catch (IOException e) {
throw handle(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 2bdb1bd..aad40e1 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -46,6 +46,11 @@ public class LocalService implements Service {
/** Converts a result set (not serializable) into a serializable response. */
public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) {
+ if (resultSet.updateCount != -1) {
+ return new ResultSetResponse(resultSet.connectionId,
+ resultSet.statementId, resultSet.ownStatement, null, null,
+ resultSet.updateCount);
+ }
Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory;
final List<Object> list;
if (resultSet.firstFrame != null) {
@@ -70,7 +75,7 @@ public class LocalService implements Service {
signature = signature.setCursorFactory(cursorFactory);
}
return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
- resultSet.ownStatement, signature, new Meta.Frame(0, true, list));
+ resultSet.ownStatement, signature, new Meta.Frame(0, true, list), -1);
}
private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
@@ -124,10 +129,10 @@ public class LocalService implements Service {
return new PrepareResponse(h);
}
- public ResultSetResponse apply(PrepareAndExecuteRequest request) {
+ public ExecuteResponse apply(PrepareAndExecuteRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
- final Meta.MetaResultSet resultSet =
+ final Meta.ExecuteResult executeResult =
meta.prepareAndExecute(ch, request.sql, request.maxRowCount,
new Meta.PrepareCallback() {
@Override public Object getMonitor() {
@@ -138,13 +143,17 @@ public class LocalService implements Service {
}
@Override public void assign(Meta.Signature signature,
- Meta.Frame firstFrame) {
+ Meta.Frame firstFrame, int updateCount) {
}
@Override public void execute() {
}
});
- return toResponse(resultSet);
+ final List<ResultSetResponse> results = new ArrayList<>();
+ for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+ results.add(toResponse(metaResultSet));
+ }
+ return new ExecuteResponse(results);
}
public FetchResponse apply(FetchRequest request) {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
index 02cb191..7cc5420 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -64,21 +64,21 @@ public class MockJsonService extends JsonService {
try {
map1.put(
"{\"request\":\"getSchemas\",\"catalog\":null,\"schemaPattern\":{\"s\":null}}",
- "{\"response\":\"resultSet\", firstFrame: {offset: 0, done: true, rows: []}}");
+ "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
map1.put(
JsonService.encode(new SchemasRequest(null, null)),
- "{\"response\":\"resultSet\", firstFrame: {offset: 0, done: true, rows: []}}");
+ "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
map1.put(
JsonService.encode(
new TablesRequest(null, null, null, Arrays.<String>asList())),
- "{\"response\":\"resultSet\", firstFrame: {offset: 0, done: true, rows: []}}");
+ "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
map1.put(
"{\"request\":\"createStatement\",\"connectionId\":0}",
"{\"response\":\"createStatement\",\"id\":0}");
map1.put(
"{\"request\":\"prepareAndExecute\",\"statementId\":0,"
+ "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}",
- "{\"response\":\"resultSet\",\"signature\": {\n"
+ "{\"response\":\"resultSet\", updateCount: -1, \"signature\": {\n"
+ " \"columns\": [\n"
+ " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n"
+ " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n"
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 93e5541..9a3eab6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -24,6 +24,7 @@ import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MetaImpl;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -43,6 +44,10 @@ class RemoteMeta extends MetaImpl {
private MetaResultSet toResultSet(Class clazz,
Service.ResultSetResponse response) {
+ if (response.updateCount != -1) {
+ return MetaResultSet.count(response.connectionId, response.statementId,
+ response.updateCount);
+ }
Signature signature0 = response.signature;
if (signature0 == null) {
final List<ColumnMetaData> columns =
@@ -52,7 +57,7 @@ class RemoteMeta extends MetaImpl {
signature0 = Signature.create(columns,
"?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY);
}
- return new MetaResultSet(response.connectionId, response.statementId,
+ return MetaResultSet.create(response.connectionId, response.statementId,
response.ownStatement, signature0, response.firstFrame);
}
@@ -142,19 +147,27 @@ class RemoteMeta extends MetaImpl {
return response.statement;
}
- @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
+ @Override public ExecuteResult prepareAndExecute(ConnectionHandle ch,
String sql, int maxRowCount, PrepareCallback callback) {
connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection state if necessary
- final Service.ResultSetResponse response;
+ final Service.ExecuteResponse response;
try {
synchronized (callback.getMonitor()) {
callback.clear();
response = service.apply(
new Service.PrepareAndExecuteRequest(ch.id, sql, maxRowCount));
- callback.assign(response.signature, response.firstFrame);
+ if (response.results.size() > 0) {
+ final Service.ResultSetResponse result = response.results.get(0);
+ callback.assign(result.signature, result.firstFrame,
+ result.updateCount);
+ }
}
callback.execute();
- return toResultSet(null, response);
+ List<MetaResultSet> metaResultSets = new ArrayList<>();
+ for (Service.ResultSetResponse result : response.results) {
+ metaResultSets.add(toResultSet(null, result));
+ }
+ return new ExecuteResult(metaResultSets);
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index c85e53a..430450d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -36,7 +36,7 @@ public interface Service {
ResultSetResponse apply(TableTypesRequest request);
ResultSetResponse apply(ColumnsRequest request);
PrepareResponse apply(PrepareRequest request);
- ResultSetResponse apply(PrepareAndExecuteRequest request);
+ ExecuteResponse apply(PrepareAndExecuteRequest request);
FetchResponse apply(FetchRequest request);
CreateStatementResponse apply(CreateStatementRequest request);
CloseStatementResponse apply(CloseStatementRequest request);
@@ -148,8 +148,7 @@ public interface Service {
/** Request for
* {@link Meta#getTableTypes()}. */
class TableTypesRequest extends Request {
-
- ResultSetResponse accept(Service service) {
+ @Override ResultSetResponse accept(Service service) {
return service.apply(this);
}
}
@@ -181,6 +180,10 @@ public interface Service {
/** Response that contains a result set.
*
+ * <p>Regular result sets have {@code updateCount} -1;
+ * any other value means a dummy result set that is just a count, and has
+ * no signature and no other data.
+ *
* <p>Several types of request, including
* {@link org.apache.calcite.avatica.Meta#getCatalogs()} and
* {@link org.apache.calcite.avatica.Meta#getSchemas(String, org.apache.calcite.avatica.Meta.Pat)}
@@ -193,6 +196,7 @@ public interface Service {
public final boolean ownStatement;
public final Meta.Signature signature;
public final Meta.Frame firstFrame;
+ public final int updateCount;
@JsonCreator
public ResultSetResponse(
@@ -200,12 +204,14 @@ public interface Service {
@JsonProperty("statementId") int statementId,
@JsonProperty("ownStatement") boolean ownStatement,
@JsonProperty("signature") Meta.Signature signature,
- @JsonProperty("firstFrame") Meta.Frame firstFrame) {
+ @JsonProperty("firstFrame") Meta.Frame firstFrame,
+ @JsonProperty("updateCount") int updateCount) {
this.connectionId = connectionId;
this.statementId = statementId;
this.ownStatement = ownStatement;
this.signature = signature;
this.firstFrame = firstFrame;
+ this.updateCount = updateCount;
}
}
@@ -226,11 +232,23 @@ public interface Service {
this.maxRowCount = maxRowCount;
}
- @Override ResultSetResponse accept(Service service) {
+ @Override ExecuteResponse accept(Service service) {
return service.apply(this);
}
}
+ /** Response to a
+ * {@link org.apache.calcite.avatica.remote.Service.PrepareAndExecuteRequest}. */
+ class ExecuteResponse extends Response {
+ public final List<ResultSetResponse> results;
+
+ @JsonCreator
+ public ExecuteResponse(
+ @JsonProperty("resultSets") List<ResultSetResponse> results) {
+ this.results = results;
+ }
+ }
+
/** Request for
* {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int)}. */
class PrepareRequest extends Request {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index 2a9c4ad..90c711e 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -150,9 +150,9 @@ public class CalciteMetaImpl extends MetaImpl {
private <E> MetaResultSet createResultSet(Enumerable<E> enumerable,
Class clazz, String... names) {
- final List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
- final List<Field> fields = new ArrayList<Field>();
- final List<String> fieldNames = new ArrayList<String>();
+ final List<ColumnMetaData> columns = new ArrayList<>();
+ final List<Field> fields = new ArrayList<>();
+ final List<String> fieldNames = new ArrayList<>();
for (String name : names) {
final int index = fields.size();
final String fieldName = AvaticaUtils.toCamelCase(name);
@@ -196,7 +196,7 @@ public class CalciteMetaImpl extends MetaImpl {
return Linq4j.asEnumerable(firstFrame.rows);
}
};
- return new MetaResultSet(connection.id, statement.getId(), true,
+ return MetaResultSet.create(connection.id, statement.getId(), true,
signature, firstFrame);
} catch (SQLException e) {
throw new RuntimeException(e);
@@ -476,7 +476,7 @@ public class CalciteMetaImpl extends MetaImpl {
return h;
}
- @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
+ @Override public ExecuteResult prepareAndExecute(ConnectionHandle ch,
String sql, int maxRowCount, PrepareCallback callback) {
final CalcitePrepare.CalciteSignature<Object> signature;
final StatementHandle h = createStatement(ch);
@@ -488,10 +488,12 @@ public class CalciteMetaImpl extends MetaImpl {
calciteConnection.server.getStatement(h);
signature = calciteConnection.parseQuery(sql,
statement.createPrepareContext(), maxRowCount);
- callback.assign(signature, null);
+ callback.assign(signature, null, -1);
}
callback.execute();
- return new MetaResultSet(h.connectionId, h.id, false, signature, null);
+ final MetaResultSet metaResultSet =
+ MetaResultSet.create(h.connectionId, h.id, false, signature, null);
+ return new ExecuteResult(ImmutableList.of(metaResultSet));
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0d80fd25/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
index 05f54b7..9521201 100644
--- a/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
+++ b/core/src/test/java/org/apache/calcite/jdbc/CalciteRemoteDriverTest.java
@@ -381,6 +381,23 @@ public class CalciteRemoteDriverTest {
}
}
+ /** Checks {@link Statement#execute} on a query over a remote connection.
+ *
+ * <p>Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-646">[CALCITE-646]
+ * AvaticaStatement execute method broken over remote JDBC</a>. */
+ @Test public void testRemoteStatementExecute() throws Exception {
+ final Statement statement = remoteConnection.createStatement();
+ final boolean status = statement.execute("values (1, 2), (3, 4), (5, 6)");
+ final ResultSet resultSet = statement.getResultSet();
+ int n = 0;
+ while (resultSet.next()) {
+ ++n;
+ }
+ assertThat(n, equalTo(3));
+
+ }
+
/** A bunch of sample values of various types. */
private static final List<Object> SAMPLE_VALUES =
ImmutableList.<Object>of(false, true,