You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/10/29 01:46:24 UTC
[6/6] calcite git commit: [CALCITE-903] Enable Avatica client to
recover from missing server-side state (Josh Elser)
[CALCITE-903] Enable Avatica client to recover from missing server-side state (Josh Elser)
Close apache/calcite#140
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/97df1acb
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/97df1acb
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/97df1acb
Branch: refs/heads/master
Commit: 97df1acbe82881f0fd196477dadbcbc173bddc20
Parents: 1a491b7
Author: Josh Elser <el...@apache.org>
Authored: Wed Sep 23 11:53:13 2015 -0400
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Oct 28 16:42:33 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 109 +-
.../calcite/avatica/jdbc/JdbcResultSet.java | 12 +-
.../calcite/avatica/jdbc/StatementInfo.java | 170 ++
.../calcite/avatica/jdbc/StatementInfoTest.java | 138 +
.../remote/AlternatingRemoteMetaTest.java | 392 +++
.../calcite/avatica/remote/RemoteMetaTest.java | 6 +-
.../calcite/avatica/AvaticaConnection.java | 77 +-
.../avatica/AvaticaDatabaseMetaData.java | 789 ++++-
.../apache/calcite/avatica/AvaticaFactory.java | 3 +-
.../calcite/avatica/AvaticaJdbc41Factory.java | 4 +-
.../avatica/AvaticaPreparedStatement.java | 11 +-
.../calcite/avatica/AvaticaResultSet.java | 5 +-
.../calcite/avatica/AvaticaStatement.java | 40 +-
.../java/org/apache/calcite/avatica/Meta.java | 17 +-
.../org/apache/calcite/avatica/MetaImpl.java | 69 +-
.../avatica/MissingResultsException.java | 41 +
.../avatica/NoSuchConnectionException.java | 37 +
.../avatica/NoSuchStatementException.java | 39 +
.../org/apache/calcite/avatica/QueryState.java | 489 +++
.../calcite/avatica/UnregisteredDriver.java | 2 +-
.../apache/calcite/avatica/proto/Common.java | 2847 +++++++++++++++++-
.../apache/calcite/avatica/proto/Requests.java | 770 ++++-
.../apache/calcite/avatica/proto/Responses.java | 769 ++++-
.../calcite/avatica/remote/AbstractHandler.java | 5 +
.../calcite/avatica/remote/AbstractService.java | 5 +-
.../avatica/remote/AvaticaHttpClient.java | 34 +
.../avatica/remote/AvaticaHttpClientImpl.java | 73 +
.../apache/calcite/avatica/remote/Driver.java | 65 +-
.../apache/calcite/avatica/remote/Handler.java | 4 +
.../calcite/avatica/remote/JsonService.java | 8 +
.../calcite/avatica/remote/LocalService.java | 99 +-
.../avatica/remote/MetaDataOperation.java | 181 ++
.../calcite/avatica/remote/MockJsonService.java | 2 +-
.../avatica/remote/MockProtobufService.java | 2 +-
.../calcite/avatica/remote/ProtobufService.java | 4 +
.../avatica/remote/ProtobufTranslationImpl.java | 6 +
.../calcite/avatica/remote/RemoteMeta.java | 368 ++-
.../avatica/remote/RemoteProtobufService.java | 42 +-
.../calcite/avatica/remote/RemoteService.java | 45 +-
.../apache/calcite/avatica/remote/Service.java | 227 +-
avatica/src/main/protobuf/common.proto | 64 +
avatica/src/main/protobuf/requests.proto | 7 +
avatica/src/main/protobuf/responses.proto | 8 +
.../calcite/avatica/AvaticaConnectionTest.java | 60 +
.../apache/calcite/avatica/QueryStateTest.java | 513 ++++
.../avatica/remote/AvaticaHttpClientTest.java | 93 +
.../avatica/remote/MetaDataOperationTest.java | 37 +
.../avatica/remote/ProtobufHandlerTest.java | 2 +-
.../remote/ProtobufTranslationImplTest.java | 37 +-
.../calcite/avatica/test/JsonHandlerTest.java | 6 +-
.../calcite/jdbc/CalciteJdbc41Factory.java | 3 +-
.../apache/calcite/jdbc/CalciteMetaImpl.java | 20 +-
.../apache/calcite/jdbc/CalciteResultSet.java | 2 +-
src/main/config/checkstyle/checker.xml | 4 -
54 files changed, 8302 insertions(+), 560 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index b8e4ea4..1bfd7f6 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -23,6 +23,10 @@ import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.ConnectionPropertiesImpl;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchConnectionException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.avatica.remote.TypedValue;
@@ -51,7 +55,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -64,7 +67,7 @@ public class JdbcMeta implements Meta {
private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache";
- /** Special value for {@link Statement#getLargeMaxRows()} that means fetch
+ /** Special value for {@code Statement#getLargeMaxRows()} that means fetch
* an unlimited number of rows in a single batch.
*
* <p>Any other negative value will return an unlimited number of rows but
@@ -290,7 +293,7 @@ public class JdbcMeta implements Meta {
private int registerMetaStatement(ResultSet rs) throws SQLException {
final int id = statementIdGenerator.getAndIncrement();
StatementInfo statementInfo = new StatementInfo(rs.getStatement());
- statementInfo.resultSet = rs;
+ statementInfo.setResultSet(rs);
statementCache.put(id, statementInfo);
return id;
}
@@ -508,7 +511,7 @@ public class JdbcMeta implements Meta {
return null;
}
- public Iterable<Object> createIterable(StatementHandle handle,
+ public Iterable<Object> createIterable(StatementHandle handle, QueryState state,
Signature signature, List<TypedValue> parameterValues, Frame firstFrame) {
return null;
}
@@ -519,7 +522,8 @@ public class JdbcMeta implements Meta {
}
Connection conn = connectionCache.getIfPresent(id);
if (conn == null) {
- throw new RuntimeException("Connection not found: invalid id, closed, or expired: " + id);
+ throw new NoSuchConnectionException("Connection not found: invalid id, closed, or expired: "
+ + id);
}
return conn;
}
@@ -550,8 +554,9 @@ public class JdbcMeta implements Meta {
LOG.trace("closing statement " + h);
}
try {
- if (info.resultSet != null) {
- info.resultSet.close();
+ ResultSet results = info.getResultSet();
+ if (info.isResultSetInitialized() && null != results) {
+ results.close();
}
info.statement.close();
} catch (SQLException e) {
@@ -674,12 +679,11 @@ public class JdbcMeta implements Meta {
}
public ExecuteResult prepareAndExecute(StatementHandle h, String sql,
- long maxRowCount, PrepareCallback callback) {
+ long maxRowCount, PrepareCallback callback) throws NoSuchStatementException {
try {
final StatementInfo info = statementCache.getIfPresent(h.id);
if (info == null) {
- throw new RuntimeException("Statement not found, potentially expired. "
- + h);
+ throw new NoSuchStatementException(h);
}
final Statement statement = info.statement;
// Special handling of maxRowCount as JDBC 0 is unlimited, our meta 0 row
@@ -689,18 +693,18 @@ public class JdbcMeta implements Meta {
statement.setMaxRows(0);
}
boolean ret = statement.execute(sql);
- info.resultSet = statement.getResultSet();
- assert ret || info.resultSet == null;
+ info.setResultSet(statement.getResultSet());
+ // Either execute(sql) returned true or the resultSet was null
+ assert ret || null == info.getResultSet();
final List<MetaResultSet> resultSets = new ArrayList<>();
- if (info.resultSet == null) {
+ if (null == info.getResultSet()) {
// Create a special result set that just carries update count
resultSets.add(
JdbcResultSet.count(h.connectionId, h.id,
AvaticaUtils.getLargeUpdateCount(statement)));
} else {
resultSets.add(
- JdbcResultSet.create(h.connectionId, h.id, info.resultSet,
- maxRowCount));
+ JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), maxRowCount));
}
if (LOG.isTraceEnabled()) {
LOG.trace("prepAndExec statement " + h);
@@ -712,19 +716,51 @@ public class JdbcMeta implements Meta {
}
}
- public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) {
+ public boolean syncResults(StatementHandle sh, QueryState state, long offset)
+ throws NoSuchStatementException {
+ try {
+ final Connection conn = getConnection(sh.connectionId);
+ final StatementInfo info = statementCache.getIfPresent(sh.id);
+ if (null == info) {
+ throw new NoSuchStatementException(sh);
+ }
+ final Statement statement = info.statement;
+ // Let the state recreate the necessary ResultSet on the Statement
+ info.setResultSet(state.invoke(conn, statement));
+
+ if (null != info.getResultSet()) {
+ // If it is non-null, try to advance to the requested offset.
+ return info.advanceResultSetToOffset(info.getResultSet(), offset);
+ }
+
+ // No results, nothing to do. Client can move on.
+ return false;
+ } catch (SQLException e) {
+ throw propagate(e);
+ }
+ }
+
+ public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws
+ NoSuchStatementException, MissingResultsException {
if (LOG.isTraceEnabled()) {
LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:"
+ fetchMaxRowCount);
}
try {
- final StatementInfo statementInfo = Objects.requireNonNull(
- statementCache.getIfPresent(h.id),
- "Statement not found, potentially expired. " + h);
- if (statementInfo.resultSet == null) {
+ final StatementInfo statementInfo = statementCache.getIfPresent(h.id);
+ if (null == statementInfo) {
+ // Statement might have expired, or never existed on this server.
+ throw new NoSuchStatementException(h);
+ }
+
+ if (!statementInfo.isResultSetInitialized()) {
+ // The Statement exists, but the results are missing. Need to call syncResults(...)
+ throw new MissingResultsException(h);
+ }
+ if (statementInfo.getResultSet() == null) {
return Frame.EMPTY;
} else {
- return JdbcResultSet.frame(statementInfo.resultSet, offset,
+ return JdbcResultSet.frame(statementInfo, statementInfo.getResultSet(), offset,
fetchMaxRowCount, calendar);
}
} catch (SQLException e) {
@@ -740,15 +776,16 @@ public class JdbcMeta implements Meta {
}
@Override public ExecuteResult execute(StatementHandle h,
- List<TypedValue> parameterValues, long maxRowCount) {
+ List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException {
try {
if (MetaImpl.checkParameterValueHasNull(parameterValues)) {
throw new SQLException("exception while executing query: unbound parameter");
}
- final StatementInfo statementInfo = Objects.requireNonNull(
- statementCache.getIfPresent(h.id),
- "Statement not found, potentially expired. " + h);
+ final StatementInfo statementInfo = statementCache.getIfPresent(h.id);
+ if (null == statementInfo) {
+ throw new NoSuchStatementException(h);
+ }
final List<MetaResultSet> resultSets = new ArrayList<>();
final PreparedStatement preparedStatement =
(PreparedStatement) statementInfo.statement;
@@ -772,14 +809,16 @@ public class JdbcMeta implements Meta {
signature2 = h.signature;
}
- statementInfo.resultSet = preparedStatement.getResultSet();
- if (statementInfo.resultSet == null) {
+ // Make sure we set this for subsequent fetch()'s to find the result set.
+ statementInfo.setResultSet(preparedStatement.getResultSet());
+
+ if (statementInfo.getResultSet() == null) {
frame = Frame.EMPTY;
resultSets.add(JdbcResultSet.empty(h.connectionId, h.id, signature2));
} else {
resultSets.add(
JdbcResultSet.create(h.connectionId, h.id,
- statementInfo.resultSet, maxRowCount, signature2));
+ statementInfo.getResultSet(), maxRowCount, signature2));
}
} else {
resultSets.add(
@@ -793,16 +832,6 @@ public class JdbcMeta implements Meta {
}
}
- /** All we know about a statement. */
- private static class StatementInfo {
- final Statement statement; // sometimes a PreparedStatement
- ResultSet resultSet;
-
- private StatementInfo(Statement statement) {
- this.statement = Objects.requireNonNull(statement);
- }
- }
-
/** Configurable statement cache settings. */
public enum StatementCacheSettings {
/** JDBC connection property for setting connection cache concurrency level. */
@@ -917,8 +946,8 @@ public class JdbcMeta implements Meta {
+ notification.getCause());
}
try {
- if (doomed.resultSet != null) {
- doomed.resultSet.close();
+ if (doomed.getResultSet() != null) {
+ doomed.getResultSet().close();
}
if (doomed.statement != null) {
doomed.statement.close();
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index 30ee7f4..6630124 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -88,7 +88,7 @@ class JdbcResultSet extends Meta.MetaResultSet {
} else {
fetchRowCount = (int) maxRowCount;
}
- final Meta.Frame firstFrame = frame(resultSet, 0, fetchRowCount, calendar);
+ final Meta.Frame firstFrame = frame(null, resultSet, 0, fetchRowCount, calendar);
if (firstFrame.done) {
resultSet.close();
}
@@ -114,7 +114,7 @@ class JdbcResultSet extends Meta.MetaResultSet {
/** Creates a frame containing a given number or unlimited number of rows
* from a result set. */
- static Meta.Frame frame(ResultSet resultSet, long offset,
+ static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset,
int fetchMaxRowCount, Calendar calendar) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
@@ -126,7 +126,13 @@ class JdbcResultSet extends Meta.MetaResultSet {
// Meta prepare/prepareAndExecute 0 return 0 row and done
boolean done = fetchMaxRowCount == 0;
for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) {
- if (!resultSet.next()) {
+ final boolean hasRow;
+ if (null != info) {
+ hasRow = info.next();
+ } else {
+ hasRow = resultSet.next();
+ }
+ if (!hasRow) {
done = true;
resultSet.close();
break;
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
new file mode 100644
index 0000000..ff27d05
--- /dev/null
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.Objects;
+
+/**
+ * All we know about a statement. Encapsulates a {@link ResultSet}.
+ */
+public class StatementInfo {
+ private volatile Boolean relativeSupported = null;
+
+ final Statement statement; // sometimes a PreparedStatement
+ private ResultSet resultSet;
+ private long position = 0;
+
+ // True when setResultSet(ResultSet) is called to let us determine the difference between
+ // a null ResultSet (from an update) from the lack of a ResultSet.
+ private boolean resultsInitialized = false;
+
+ public StatementInfo(Statement statement) {
+ this.statement = Objects.requireNonNull(statement);
+ }
+
+ // Visible for testing
+ void setPosition(long position) {
+ this.position = position;
+ }
+
+ // Visible for testing
+ long getPosition() {
+ return this.position;
+ }
+
+ /**
+ * Set a ResultSet on this object.
+ *
+ * @param resultSet The current ResultSet
+ */
+ public void setResultSet(ResultSet resultSet) {
+ resultsInitialized = true;
+ this.resultSet = resultSet;
+ }
+
+ /**
+ * @return The {@link ResultSet} for this Statement, may be null.
+ */
+ public ResultSet getResultSet() {
+ return this.resultSet;
+ }
+
+ /**
+ * @return True if {@link #setResultSet(ResultSet)} was ever invoked.
+ */
+ public boolean isResultSetInitialized() {
+ return resultsInitialized;
+ }
+
+ /**
+ * @see ResultSet#next()
+ */
+ public boolean next() throws SQLException {
+ return _next(resultSet);
+ }
+
+ boolean _next(ResultSet results) throws SQLException {
+ boolean ret = results.next();
+ position++;
+ return ret;
+ }
+
+ /**
+ * Consumes <code>offset - position</code> elements from the {@link ResultSet}.
+ *
+ * @param offset The offset to advance to
+ * @return True if the resultSet was advanced to the current point, false if insufficient rows
+ * were present to advance to the requested offset.
+ */
+ public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException {
+ if (offset < 0 || offset < position) {
+ throw new IllegalArgumentException("Offset should be "
+ + " non-negative and not less than the current position. " + offset + ", " + position);
+ }
+ if (position >= offset) {
+ return true;
+ }
+
+ if (null == relativeSupported) {
+ Boolean moreResults = null;
+ synchronized (this) {
+ if (null == relativeSupported) {
+ try {
+ moreResults = advanceByRelative(results, offset);
+ relativeSupported = true;
+ } catch (SQLFeatureNotSupportedException e) {
+ relativeSupported = false;
+ }
+ }
+ }
+
+ if (null != moreResults) {
+ // We figured out whether or not relative is supported.
+ // Make sure we actually do the necessary work.
+ if (!relativeSupported) {
+ // We avoided calling advanceByNext in the synchronized block earlier.
+ moreResults = advanceByNext(results, offset);
+ }
+
+ return moreResults;
+ }
+
+ // Another thread updated the RELATIVE_SUPPORTED before we did, fall through.
+ }
+
+ if (relativeSupported) {
+ return advanceByRelative(results, offset);
+ } else {
+ return advanceByNext(results, offset);
+ }
+ }
+
+ private boolean advanceByRelative(ResultSet results, long offset) throws SQLException {
+ long diff = offset - position;
+ while (diff > Integer.MAX_VALUE) {
+ if (!results.relative(Integer.MAX_VALUE)) {
+ // Avoid updating position until relative succeeds.
+ position += Integer.MAX_VALUE;
+ return false;
+ }
+ // Avoid updating position until relative succeeds.
+ position += Integer.MAX_VALUE;
+ diff -= Integer.MAX_VALUE;
+ }
+ boolean ret = results.relative((int) diff);
+ // Make sure we only update the position after successfully calling relative(int).
+ position += diff;
+ return ret;
+ }
+
+ private boolean advanceByNext(ResultSet results, long offset) throws SQLException {
+ while (position < offset) {
+ // Advance while maintaining `position`
+ if (!_next(results)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
+
+// End StatementInfo.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
new file mode 100644
index 0000000..2984692
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.sql.ResultSet;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests covering {@link StatementInfo}.
+ */
+public class StatementInfoTest {
+
+ @Test
+ public void testLargeOffsets() throws Exception {
+ Statement stmt = Mockito.mock(Statement.class);
+ ResultSet results = Mockito.mock(ResultSet.class);
+
+ StatementInfo info = new StatementInfo(stmt);
+
+ Mockito.when(results.relative(Integer.MAX_VALUE)).thenReturn(true, true);
+ Mockito.when(results.relative(1)).thenReturn(true);
+
+ long offset = 1L + Integer.MAX_VALUE + Integer.MAX_VALUE;
+ assertTrue(info.advanceResultSetToOffset(results, offset));
+
+ InOrder inOrder = Mockito.inOrder(results);
+
+ inOrder.verify(results, Mockito.times(2)).relative(Integer.MAX_VALUE);
+ inOrder.verify(results).relative(1);
+
+ assertEquals(offset, info.getPosition());
+ }
+
+ @Test
+ public void testNextUpdatesPosition() throws Exception {
+ Statement stmt = Mockito.mock(Statement.class);
+ ResultSet results = Mockito.mock(ResultSet.class);
+
+ StatementInfo info = new StatementInfo(stmt);
+ info.setResultSet(results);
+
+ Mockito.when(results.next()).thenReturn(true, true, true, false);
+
+ for (int i = 0; i < 3; i++) {
+ assertTrue(i + "th call of next() should return true", info.next());
+ assertEquals(info.getPosition(), i + 1);
+ }
+
+ assertFalse("Expected last next() to return false", info.next());
+ assertEquals(info.getPosition(), 4L);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoMovement() throws Exception {
+ Statement stmt = Mockito.mock(Statement.class);
+ ResultSet results = Mockito.mock(ResultSet.class);
+
+ StatementInfo info = new StatementInfo(stmt);
+ info.setPosition(500);
+
+ info.advanceResultSetToOffset(results, 400);
+ }
+
+ @Test public void testResultSetGetter() throws Exception {
+ Statement stmt = Mockito.mock(Statement.class);
+ ResultSet results = Mockito.mock(ResultSet.class);
+
+ StatementInfo info = new StatementInfo(stmt);
+
+ assertFalse("ResultSet should not be initialized", info.isResultSetInitialized());
+ assertNull("ResultSet should be null", info.getResultSet());
+
+ info.setResultSet(results);
+
+ assertTrue("ResultSet should be initialized", info.isResultSetInitialized());
+ assertEquals(results, info.getResultSet());
+ }
+
+ @Test public void testCheckPositionAfterFailedRelative() throws Exception {
+ Statement stmt = Mockito.mock(Statement.class);
+ ResultSet results = Mockito.mock(ResultSet.class);
+ final long offset = 500;
+
+ StatementInfo info = new StatementInfo(stmt);
+ info.setResultSet(results);
+
+ // relative() doesn't work
+ Mockito.when(results.relative((int) offset)).thenThrow(new SQLFeatureNotSupportedException());
+ // Should fall back to next(), 500 calls to next, 1 false
+ Mockito.when(results.next()).then(new Answer<Boolean>() {
+ private long invocations = 0;
+
+ // Return true until 500, false after.
+ @Override public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ invocations++;
+ if (invocations >= offset) {
+ return false;
+ }
+ return true;
+ }
+ });
+
+ info.advanceResultSetToOffset(results, offset);
+
+ // Verify correct position
+ assertEquals(offset, info.getPosition());
+ // Make sure that we actually advanced the result set
+ Mockito.verify(results, Mockito.times(500)).next();
+ }
+}
+
+// End StatementInfoTest.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
new file mode 100644
index 0000000..df2862d
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.ConnectionConfig;
+import org.apache.calcite.avatica.ConnectionPropertiesImpl;
+import org.apache.calcite.avatica.ConnectionSpec;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.server.AvaticaHandler;
+import org.apache.calcite.avatica.server.HttpServer;
+import org.apache.calcite.avatica.server.Main;
+import org.apache.calcite.avatica.server.Main.HandlerFactory;
+
+import com.google.common.cache.Cache;
+
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that verify that the Driver still functions when requests are randomly bounced between
+ * more than one server.
+ */
+public class AlternatingRemoteMetaTest {
+ private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB;
+
+ private static String url;
+
+ static {
+ try {
+ DriverManager.registerDriver(new AlternatingDriver());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Keep a reference to the servers we start to clean them up after
+ private static final List<HttpServer> ACTIVE_SERVERS = new ArrayList<>();
+
+ /** Factory that provides a {@link JdbcMeta}. */
+ public static class FullyRemoteJdbcMetaFactory implements Meta.Factory {
+
+ private static JdbcMeta instance = null;
+
+ private static JdbcMeta getInstance() {
+ if (instance == null) {
+ try {
+ instance = new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
+ CONNECTION_SPEC.password);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return instance;
+ }
+
+ @Override public Meta create(List<String> args) {
+ return getInstance();
+ }
+ }
+
+ /**
+ * AvaticaHttpClient implementation that randomly chooses among the provided URLs.
+ */
+ public static class AlternatingAvaticaHttpClient implements AvaticaHttpClient {
+ private final List<AvaticaHttpClientImpl> clients;
+ private final Random r = new Random();
+
+ public AlternatingAvaticaHttpClient(List<URL> urls) {
+ //System.out.println("Constructing clients for " + urls);
+ clients = new ArrayList<>(urls.size());
+ for (URL url : urls) {
+ clients.add(new AvaticaHttpClientImpl(url));
+ }
+ }
+
+ public byte[] send(byte[] request) {
+ AvaticaHttpClientImpl client = clients.get(r.nextInt(clients.size()));
+ //System.out.println("URL: " + client.url);
+ return client.send(request);
+ }
+
+ }
+
+ /**
+ * Driver implementation {@link AlternatingAvaticaHttpClient}.
+ */
+ public static class AlternatingDriver extends Driver {
+
+ public static final String PREFIX = "jdbc:avatica:remote-alternating:";
+
+ @Override protected String getConnectStringPrefix() {
+ return PREFIX;
+ }
+
+ @Override public Meta createMeta(AvaticaConnection connection) {
+ final ConnectionConfig config = connection.config();
+ return new RemoteMeta(connection, new RemoteService(getHttpClient(connection, config)));
+ }
+
+ @Override AvaticaHttpClient getHttpClient(AvaticaConnection connection,
+ ConnectionConfig config) {
+ return new AlternatingAvaticaHttpClient(parseUrls(config.url()));
+ }
+
+ List<URL> parseUrls(String urlStr) {
+ final List<URL> urls = new ArrayList<>();
+ final char comma = ',';
+
+ int prevIndex = 0;
+ int index = urlStr.indexOf(comma);
+ if (-1 == index) {
+ try {
+ return Collections.singletonList(new URL(urlStr));
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // String split w/o regex
+ while (-1 != index) {
+ try {
+ urls.add(new URL(urlStr.substring(prevIndex, index)));
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ prevIndex = index + 1;
+ index = urlStr.indexOf(comma, prevIndex);
+ }
+
+ // Get the last one
+ try {
+ urls.add(new URL(urlStr.substring(prevIndex)));
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return urls;
+ }
+
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ final String[] mainArgs = new String[] { FullyRemoteJdbcMetaFactory.class.getName() };
+
+ // Bind to '0' to pluck an ephemeral port instead of expecting a certain one to be free
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 2; i++) {
+ if (sb.length() > 0) {
+ sb.append(",");
+ }
+ HttpServer jsonServer = Main.start(mainArgs, 0, new HandlerFactory() {
+ @Override public AbstractHandler createHandler(Service service) {
+ return new AvaticaHandler(service);
+ }
+ });
+ ACTIVE_SERVERS.add(jsonServer);
+ sb.append("http://localhost:").append(jsonServer.getPort());
+ }
+
+ url = AlternatingDriver.PREFIX + "url=" + sb.toString();
+ }
+
+ @AfterClass public static void afterClass() throws Exception {
+ for (HttpServer server : ACTIVE_SERVERS) {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+
+ private static Meta getMeta(AvaticaConnection conn) throws Exception {
+ Field f = AvaticaConnection.class.getDeclaredField("meta");
+ f.setAccessible(true);
+ return (Meta) f.get(conn);
+ }
+
+ private static Meta.ExecuteResult prepareAndExecuteInternal(AvaticaConnection conn,
+ final AvaticaStatement statement, String sql, int maxRowCount) throws Exception {
+ Method m =
+ AvaticaConnection.class.getDeclaredMethod("prepareAndExecuteInternal",
+ AvaticaStatement.class, String.class, long.class);
+ m.setAccessible(true);
+ return (Meta.ExecuteResult) m.invoke(conn, statement, sql, maxRowCount);
+ }
+
+ private static Connection getConnection(JdbcMeta m, String id) throws Exception {
+ Field f = JdbcMeta.class.getDeclaredField("connectionCache");
+ f.setAccessible(true);
+ //noinspection unchecked
+ Cache<String, Connection> connectionCache = (Cache<String, Connection>) f.get(m);
+ return connectionCache.getIfPresent(id);
+ }
+
+ @Test public void testRemoteExecuteMaxRowCount() throws Exception {
+ ConnectionSpec.getDatabaseLock().lock();
+ try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+ final AvaticaStatement statement = conn.createStatement();
+ prepareAndExecuteInternal(conn, statement,
+ "select * from (values ('a', 1), ('b', 2))", 0);
+ ResultSet rs = statement.getResultSet();
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ assertEquals("Check maxRowCount=0 and ResultSets is 0 row", count, 0);
+ assertEquals("Check result set meta is still there",
+ rs.getMetaData().getColumnCount(), 2);
+ rs.close();
+ statement.close();
+ conn.close();
+ } finally {
+ ConnectionSpec.getDatabaseLock().unlock();
+ }
+ }
+
+ /** Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-780">[CALCITE-780]
+ * HTTP error 413 when sending a long string to the Avatica server</a>. */
+ @Test public void testRemoteExecuteVeryLargeQuery() throws Exception {
+ ConnectionSpec.getDatabaseLock().lock();
+ try {
+ // Before the bug was fixed, a value over 7998 caused an HTTP 413.
+ // 16K bytes, I guess.
+ checkLargeQuery(8);
+ checkLargeQuery(240);
+ checkLargeQuery(8000);
+ checkLargeQuery(240000);
+ } finally {
+ ConnectionSpec.getDatabaseLock().unlock();
+ }
+ }
+
+ private void checkLargeQuery(int n) throws Exception {
+ try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+ final AvaticaStatement statement = conn.createStatement();
+ final String frenchDisko = "It said human existence is pointless\n"
+ + "As acts of rebellious solidarity\n"
+ + "Can bring sense in this world\n"
+ + "La resistance!\n";
+ final String sql = "select '"
+ + longString(frenchDisko, n)
+ + "' as s from (values 'x')";
+ prepareAndExecuteInternal(conn, statement, sql, -1);
+ ResultSet rs = statement.getResultSet();
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ assertThat(count, is(1));
+ rs.close();
+ statement.close();
+ conn.close();
+ }
+ }
+
+ /** Creates a string of exactly {@code length} characters by concatenating
+ * {@code fragment}. */
+ private static String longString(String fragment, int length) {
+ assert fragment.length() > 0;
+ final StringBuilder buf = new StringBuilder();
+ while (buf.length() < length) {
+ buf.append(fragment);
+ }
+ buf.setLength(length);
+ return buf.toString();
+ }
+
+ @Test public void testRemoteConnectionProperties() throws Exception {
+ ConnectionSpec.getDatabaseLock().lock();
+ try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+ String id = conn.id;
+ final Map<String, ConnectionPropertiesImpl> m = ((RemoteMeta) getMeta(conn)).propsMap;
+ assertFalse("remote connection map should start ignorant", m.containsKey(id));
+ // force creating a connection object on the remote side.
+ try (final Statement stmt = conn.createStatement()) {
+ assertTrue("creating a statement starts a local object.", m.containsKey(id));
+ assertTrue(stmt.execute("select count(1) from EMP"));
+ }
+ Connection remoteConn = getConnection(FullyRemoteJdbcMetaFactory.getInstance(), id);
+ final boolean defaultRO = remoteConn.isReadOnly();
+ final boolean defaultAutoCommit = remoteConn.getAutoCommit();
+ final String defaultCatalog = remoteConn.getCatalog();
+ final String defaultSchema = remoteConn.getSchema();
+ conn.setReadOnly(!defaultRO);
+ assertTrue("local changes dirty local state", m.get(id).isDirty());
+ assertEquals("remote connection has not been touched", defaultRO, remoteConn.isReadOnly());
+ conn.setAutoCommit(!defaultAutoCommit);
+ assertEquals("remote connection has not been touched",
+ defaultAutoCommit, remoteConn.getAutoCommit());
+
+ // further interaction with the connection will force a sync
+ try (final Statement stmt = conn.createStatement()) {
+ assertEquals(!defaultAutoCommit, remoteConn.getAutoCommit());
+ assertFalse("local values should be clean", m.get(id).isDirty());
+ }
+ } finally {
+ ConnectionSpec.getDatabaseLock().unlock();
+ }
+ }
+
+ @Test public void testQuery() throws Exception {
+ ConnectionSpec.getDatabaseLock().lock();
+ try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+ Statement statement = conn.createStatement()) {
+ assertFalse(statement.execute("SET SCHEMA \"SCOTT\""));
+ assertFalse(
+ statement.execute(
+ "CREATE TABLE \"FOO\"(\"KEY\" INTEGER NOT NULL, \"VALUE\" VARCHAR(10))"));
+ assertFalse(statement.execute("SET TABLE \"FOO\" READONLY FALSE"));
+
+ final int numRecords = 1000;
+ for (int i = 0; i < numRecords; i++) {
+ assertFalse(statement.execute("INSERT INTO \"FOO\" VALUES(" + i + ", '" + i + "')"));
+ }
+
+ // Make sure all the records are there that we expect
+ ResultSet results = statement.executeQuery("SELECT count(KEY) FROM FOO");
+ assertTrue(results.next());
+ assertEquals(1000, results.getInt(1));
+ assertFalse(results.next());
+
+ results = statement.executeQuery("SELECT KEY, VALUE FROM FOO ORDER BY KEY ASC");
+ for (int i = 0; i < numRecords; i++) {
+ assertTrue(results.next());
+ assertEquals(i, results.getInt(1));
+ assertEquals(Integer.toString(i), results.getString(2));
+ }
+ } finally {
+ ConnectionSpec.getDatabaseLock().unlock();
+ }
+ }
+
+ @Test public void testSingleUrlParsing() throws Exception {
+ AlternatingDriver d = new AlternatingDriver();
+ List<URL> urls = d.parseUrls("http://localhost:1234");
+ assertEquals(Arrays.asList(new URL("http://localhost:1234")), urls);
+ }
+
+ @Test public void testMultipleUrlParsing() throws Exception {
+ AlternatingDriver d = new AlternatingDriver();
+ List<URL> urls = d.parseUrls("http://localhost:1234,http://localhost:2345,"
+ + "http://localhost:3456");
+ List<URL> expectedUrls = Arrays.asList(new URL("http://localhost:1234"),
+ new URL("http://localhost:2345"), new URL("http://localhost:3456"));
+ assertEquals(expectedUrls, urls);
+ }
+}
+
+// End AlternatingRemoteMetaTest.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
index e4d8690..c68f3cf 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
@@ -35,6 +35,7 @@ import com.google.common.cache.Cache;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.AfterClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -344,6 +345,7 @@ public class RemoteMetaTest {
}
}
+ @Ignore("[CALCITE-942] AvaticaConnection should fail-fast when closed.")
@Test public void testRemoteConnectionClosing() throws Exception {
AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
// Verify connection is usable
@@ -354,9 +356,9 @@ public class RemoteMetaTest {
try {
conn.createStatement();
fail("expected exception");
- } catch (RuntimeException e) {
+ } catch (SQLException e) {
assertThat(e.getMessage(),
- containsString("Connection not found: invalid id, closed, or expired"));
+ containsString("Connection is closed"));
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index a8867c8..03f2aa1 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -16,7 +16,10 @@
*/
package org.apache.calcite.avatica;
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
import org.apache.calcite.avatica.Meta.MetaResultSet;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest;
import org.apache.calcite.avatica.remote.TypedValue;
import java.sql.Array;
@@ -57,6 +60,9 @@ public abstract class AvaticaConnection implements Connection {
* the number of rows modified. */
public static final String ROWCOUNT_COLUMN_NAME = "ROWCOUNT";
+ public static final String NUM_EXECUTE_RETRIES_KEY = "avatica.statement.retries";
+ public static final String NUM_EXECUTE_RETRIES_DEFAULT = "5";
+
/** The name of the sole column returned by an EXPLAIN statement.
*
* <p>Actually Avatica does not care what this column is called, but here is
@@ -80,6 +86,7 @@ public abstract class AvaticaConnection implements Connection {
public final Map<InternalProperty, Object> properties = new HashMap<>();
public final Map<Integer, AvaticaStatement> statementMap =
new ConcurrentHashMap<>();
+ protected final long maxRetriesPerExecute;
/**
* Creates an AvaticaConnection.
@@ -105,6 +112,14 @@ public abstract class AvaticaConnection implements Connection {
this.meta = driver.createMeta(this);
this.metaData = factory.newDatabaseMetaData(this);
this.holdability = metaData.getResultSetHoldability();
+ this.maxRetriesPerExecute = getNumStatementRetries(info);
+ }
+
+ /** Computes the number of retries
+ * {@link #executeInternal(String)} should retry before failing. */
+ long getNumStatementRetries(Properties props) {
+ return Long.valueOf(Objects.requireNonNull(props)
+ .getProperty(NUM_EXECUTE_RETRIES_KEY, NUM_EXECUTE_RETRIES_DEFAULT));
}
/** Returns a view onto this connection's configuration properties. Code
@@ -116,6 +131,14 @@ public abstract class AvaticaConnection implements Connection {
return new ConnectionConfigImpl(info);
}
+ /**
+ * Opens the connection on the server.
+ */
+ public void openConnection() {
+ // Open the connection on the server
+ this.meta.openConnection(handle, OpenConnectionRequest.serializeProperties(info));
+ }
+
// Connection methods
public AvaticaStatement createStatement() throws SQLException {
@@ -412,11 +435,12 @@ public abstract class AvaticaConnection implements Connection {
* @param statement Statement
* @param signature Prepared query
* @param firstFrame First frame of rows, or null if we need to execute
+ * @param state The state used to create the given result
* @return Result set
* @throws java.sql.SQLException if a database error occurs
*/
protected ResultSet executeQueryInternal(AvaticaStatement statement,
- Meta.Signature signature, Meta.Frame firstFrame) throws SQLException {
+ Meta.Signature signature, Meta.Frame firstFrame, QueryState state) throws SQLException {
// Close the previous open result set, if there is one.
Meta.Frame frame = firstFrame;
Meta.Signature signature2 = signature;
@@ -453,8 +477,9 @@ public abstract class AvaticaConnection implements Connection {
if (frame == null && signature2 == null && statement.updateCount != -1) {
statement.openResultSet = null;
} else {
+ // Duplicative SQL, for support non-prepared statements
statement.openResultSet =
- factory.newResultSet(statement, signature2, timeZone, frame);
+ factory.newResultSet(statement, state, signature2, timeZone, frame);
}
}
// Release the monitor before executing, to give another thread the
@@ -502,8 +527,8 @@ public abstract class AvaticaConnection implements Connection {
}
protected Meta.ExecuteResult prepareAndExecuteInternal(
- final AvaticaStatement statement, String sql, long maxRowCount)
- throws SQLException {
+ final AvaticaStatement statement, final String sql, long maxRowCount)
+ throws SQLException, NoSuchStatementException {
final Meta.PrepareCallback callback =
new Meta.PrepareCallback() {
public Object getMonitor() {
@@ -531,7 +556,7 @@ public abstract class AvaticaConnection implements Connection {
statement.updateCount = updateCount;
} else {
final TimeZone timeZone = getTimeZone();
- statement.openResultSet = factory.newResultSet(statement,
+ statement.openResultSet = factory.newResultSet(statement, new QueryState(sql),
signature, timeZone, firstFrame);
}
}
@@ -546,13 +571,13 @@ public abstract class AvaticaConnection implements Connection {
return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback);
}
- protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
+ protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState state)
throws SQLException {
final Meta.StatementHandle h = new Meta.StatementHandle(
metaResultSet.connectionId, metaResultSet.statementId, null);
final AvaticaStatement statement = lookupStatement(h);
ResultSet resultSet = executeQueryInternal(statement, metaResultSet.signature.sanitize(),
- metaResultSet.firstFrame);
+ metaResultSet.firstFrame, state);
if (metaResultSet.ownStatement) {
resultSet.getStatement().closeOnCompletion();
}
@@ -617,6 +642,44 @@ public abstract class AvaticaConnection implements Connection {
return connection.meta;
}
}
+
+ /**
+ * A Callable-like interface but without a "throws Exception".
+ *
+ * @param <T> The return type from {@code call}.
+ */
+ public interface CallableWithoutException<T> {
+ T call();
+ }
+
+ /**
+ * Invokes the given "callable", retrying the call when the server responds with an error
+ * denoting that the connection is missing on the server.
+ *
+ * @param callable The function to invoke.
+ * @return The value from the result of the callable.
+ */
+ public <T> T invokeWithRetries(CallableWithoutException<T> callable) {
+ RuntimeException lastException = null;
+ for (int i = 0; i < maxRetriesPerExecute; i++) {
+ try {
+ return callable.call();
+ } catch (AvaticaClientRuntimeException e) {
+ lastException = e;
+ if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()) {
+ this.openConnection();
+ continue;
+ }
+ throw e;
+ }
+ }
+ if (null != lastException) {
+ throw lastException;
+ } else {
+ // Shouldn't ever happen.
+ throw new IllegalStateException();
+ }
+ }
}
// End AvaticaConnection.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
index 27a2687..b57f36c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.avatica;
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
+import org.apache.calcite.avatica.remote.MetaDataOperation;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
@@ -182,28 +184,53 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public String getSQLKeywords() throws SQLException {
- return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS
- .getProp(connection.meta, connection.handle, String.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<String>() {
+ public String call() {
+ return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS
+ .getProp(connection.meta, connection.handle, String.class);
+ }
+ });
}
public String getNumericFunctions() throws SQLException {
- return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS
- .getProp(connection.meta, connection.handle, String.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<String>() {
+ public String call() {
+ return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS
+ .getProp(connection.meta, connection.handle, String.class);
+ }
+ });
}
public String getStringFunctions() throws SQLException {
- return Meta.DatabaseProperty.GET_STRING_FUNCTIONS
- .getProp(connection.meta, connection.handle, String.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<String>() {
+ public String call() {
+ return Meta.DatabaseProperty.GET_STRING_FUNCTIONS
+ .getProp(connection.meta, connection.handle, String.class);
+ }
+ });
}
public String getSystemFunctions() throws SQLException {
- return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS
- .getProp(connection.meta, connection.handle, String.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<String>() {
+ public String call() {
+ return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS
+ .getProp(connection.meta, connection.handle, String.class);
+ }
+ });
}
public String getTimeDateFunctions() throws SQLException {
- return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS
- .getProp(connection.meta, connection.handle, String.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<String>() {
+ public String call() {
+ return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS
+ .getProp(connection.meta, connection.handle, String.class);
+ }
+ });
}
public String getSearchStringEscape() throws SQLException {
@@ -234,8 +261,7 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
return true;
}
- public boolean supportsConvert(
- int fromType, int toType) throws SQLException {
+ public boolean supportsConvert(int fromType, int toType) throws SQLException {
return false; // TODO: more detail
}
@@ -528,8 +554,13 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public int getDefaultTransactionIsolation() throws SQLException {
- return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION
- .getProp(connection.meta, connection.handle, Integer.class);
+ return connection.invokeWithRetries(
+ new CallableWithoutException<Integer>() {
+ public Integer call() {
+ return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION
+ .getProp(connection.meta, connection.handle, Integer.class);
+ }
+ });
}
public boolean supportsTransactions() throws SQLException {
@@ -560,33 +591,90 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getProcedures(
- String catalog,
- String schemaPattern,
- String procedureNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern),
- pat(procedureNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String procedureNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern),
+ pat(procedureNamePattern)),
+ new QueryState(MetaDataOperation.GET_PROCEDURES, catalog, schemaPattern,
+ procedureNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getProcedureColumns(
- String catalog,
- String schemaPattern,
- String procedureNamePattern,
- String columnNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getProcedureColumns(connection.handle, catalog, pat(schemaPattern),
- pat(procedureNamePattern), pat(columnNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String procedureNamePattern,
+ final String columnNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getProcedureColumns(connection.handle, catalog,
+ pat(schemaPattern), pat(procedureNamePattern), pat(columnNamePattern)),
+ new QueryState(MetaDataOperation.GET_PROCEDURE_COLUMNS, catalog, schemaPattern,
+ procedureNamePattern, columnNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getTables(
- String catalog,
+ final String catalog,
final String schemaPattern,
- String tableNamePattern,
- String[] types) throws SQLException {
- List<String> typeList = types == null ? null : Arrays.asList(types);
- return connection.createResultSet(
- connection.meta.getTables(connection.handle, catalog, pat(schemaPattern),
- pat(tableNamePattern), typeList));
+ final String tableNamePattern,
+ final String[] types) throws SQLException {
+ final List<String> typeList = types == null ? null : Arrays.asList(types);
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getTables(connection.handle, catalog, pat(schemaPattern),
+ pat(tableNamePattern), typeList),
+ new QueryState(MetaDataOperation.GET_TABLES, catalog, schemaPattern,
+ tableNamePattern, types));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
private static Meta.Pat pat(String schemaPattern) {
@@ -594,11 +682,30 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getSchemas(
- String catalog, String schemaPattern) throws SQLException {
+ final String catalog, final String schemaPattern) throws SQLException {
// TODO: add a 'catch ... throw new SQLException' logic to this and other
// getXxx methods. Right now any error will throw a RuntimeException
- return connection.createResultSet(
- connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern)));
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern)),
+ new QueryState(MetaDataOperation.GET_SCHEMAS_WITH_ARGS, catalog,
+ schemaPattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getSchemas() throws SQLException {
@@ -606,103 +713,342 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getCatalogs() throws SQLException {
- return connection.createResultSet(connection.meta.getCatalogs(connection.handle));
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(connection.meta.getCatalogs(connection.handle),
+ new QueryState(MetaDataOperation.GET_CATALOGS));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getTableTypes() throws SQLException {
- return connection.createResultSet(connection.meta.getTableTypes(connection.handle));
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(connection.meta.getTableTypes(connection.handle),
+ new QueryState(MetaDataOperation.GET_TABLE_TYPES));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getColumns(
- String catalog,
- String schemaPattern,
- String tableNamePattern,
- String columnNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getColumns(connection.handle,
- catalog, pat(schemaPattern),
- pat(tableNamePattern), pat(columnNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String tableNamePattern,
+ final String columnNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getColumns(connection.handle, catalog, pat(schemaPattern),
+ pat(tableNamePattern), pat(columnNamePattern)),
+ new QueryState(MetaDataOperation.GET_COLUMNS, catalog, schemaPattern,
+ tableNamePattern, columnNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getColumnPrivileges(
- String catalog,
- String schema,
- String table,
- String columnNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table,
- pat(columnNamePattern)));
+ final String catalog,
+ final String schema,
+ final String table,
+ final String columnNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table,
+ pat(columnNamePattern)),
+ new QueryState(MetaDataOperation.GET_COLUMN_PRIVILEGES, catalog, schema, table,
+ columnNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getTablePrivileges(
- String catalog,
- String schemaPattern,
- String tableNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getTablePrivileges(connection.handle, catalog, pat(schemaPattern),
- pat(tableNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String tableNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getTablePrivileges(connection.handle, catalog,
+ pat(schemaPattern), pat(tableNamePattern)),
+ new QueryState(MetaDataOperation.GET_TABLE_PRIVILEGES, catalog, schemaPattern,
+ tableNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getBestRowIdentifier(
- String catalog,
- String schema,
- String table,
- int scope,
- boolean nullable) throws SQLException {
- return connection.createResultSet(
- connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table, scope,
- nullable));
+ final String catalog,
+ final String schema,
+ final String table,
+ final int scope,
+ final boolean nullable) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table,
+ scope, nullable),
+ new QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, catalog, table, scope,
+ nullable));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getVersionColumns(
- String catalog, String schema, String table) throws SQLException {
- return connection.createResultSet(
- connection.meta.getVersionColumns(connection.handle, catalog, schema, table));
+ final String catalog, final String schema, final String table) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getVersionColumns(connection.handle, catalog, schema, table),
+ new QueryState(MetaDataOperation.GET_VERSION_COLUMNS, catalog, schema, table));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getPrimaryKeys(
- String catalog, String schema, String table) throws SQLException {
- return connection.createResultSet(
- connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table));
+ final String catalog, final String schema, final String table) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table),
+ new QueryState(MetaDataOperation.GET_PRIMARY_KEYS, catalog, schema, table));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getImportedKeys(
- String catalog, String schema, String table) throws SQLException {
- return connection.createResultSet(
- connection.meta.getImportedKeys(connection.handle, catalog, schema, table));
+ final String catalog, final String schema, final String table) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getImportedKeys(connection.handle, catalog, schema, table),
+ new QueryState(MetaDataOperation.GET_IMPORTED_KEYS, catalog, schema, table));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getExportedKeys(
- String catalog, String schema, String table) throws SQLException {
- return connection.createResultSet(
- connection.meta.getExportedKeys(connection.handle, catalog, schema, table));
+ final String catalog, final String schema, final String table) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getExportedKeys(connection.handle, catalog, schema, table),
+ new QueryState(MetaDataOperation.GET_EXPORTED_KEYS, catalog, schema, table));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getCrossReference(
- String parentCatalog,
- String parentSchema,
- String parentTable,
- String foreignCatalog,
- String foreignSchema,
- String foreignTable) throws SQLException {
- return connection.createResultSet(
- connection.meta.getCrossReference(connection.handle, parentCatalog, parentSchema,
- parentTable, foreignCatalog, foreignSchema, foreignTable));
+ final String parentCatalog,
+ final String parentSchema,
+ final String parentTable,
+ final String foreignCatalog,
+ final String foreignSchema,
+ final String foreignTable) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getCrossReference(connection.handle, parentCatalog,
+ parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable),
+ new QueryState(MetaDataOperation.GET_CROSS_REFERENCE, parentCatalog,
+ parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getTypeInfo() throws SQLException {
- return connection.createResultSet(connection.meta.getTypeInfo(connection.handle));
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(connection.meta.getTypeInfo(connection.handle),
+ new QueryState(MetaDataOperation.GET_TYPE_INFO));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getIndexInfo(
- String catalog,
- String schema,
- String table,
- boolean unique,
- boolean approximate) throws SQLException {
- return connection.createResultSet(
- connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique,
- approximate));
+ final String catalog,
+ final String schema,
+ final String table,
+ final boolean unique,
+ final boolean approximate) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique,
+ approximate),
+ new QueryState(MetaDataOperation.GET_INDEX_INFO, catalog, schema, table, unique,
+ approximate));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public boolean supportsResultSetType(int type) throws SQLException {
@@ -756,13 +1102,32 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getUDTs(
- String catalog,
- String schemaPattern,
- String typeNamePattern,
- int[] types) throws SQLException {
- return connection.createResultSet(
- connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern),
- pat(typeNamePattern), types));
+ final String catalog,
+ final String schemaPattern,
+ final String typeNamePattern,
+ final int[] types) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern),
+ pat(typeNamePattern), types),
+ new QueryState(MetaDataOperation.GET_UDTS, catalog, schemaPattern,
+ typeNamePattern, types));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public Connection getConnection() throws SQLException {
@@ -786,31 +1151,88 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getSuperTypes(
- String catalog,
- String schemaPattern,
- String typeNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern),
- pat(typeNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String typeNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern),
+ pat(typeNamePattern)),
+ new QueryState(MetaDataOperation.GET_SUPER_TYPES, catalog, schemaPattern,
+ typeNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getSuperTables(
- String catalog,
- String schemaPattern,
- String tableNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern),
- pat(tableNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String tableNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern),
+ pat(tableNamePattern)),
+ new QueryState(MetaDataOperation.GET_SUPER_TABLES, catalog, schemaPattern,
+ tableNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getAttributes(
- String catalog,
- String schemaPattern,
- String typeNamePattern,
- String attributeNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern),
- pat(typeNamePattern), pat(attributeNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String typeNamePattern,
+ final String attributeNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern),
+ pat(typeNamePattern), pat(attributeNamePattern)),
+ new QueryState(MetaDataOperation.GET_ATTRIBUTES, catalog, schemaPattern,
+ typeNamePattern, attributeNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public boolean supportsResultSetHoldability(int holdability)
@@ -864,37 +1286,112 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
}
public ResultSet getClientInfoProperties() throws SQLException {
- return connection.createResultSet(
- connection.meta.getClientInfoProperties(connection.handle));
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getClientInfoProperties(connection.handle),
+ new QueryState(MetaDataOperation.GET_CLIENT_INFO_PROPERTIES));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getFunctions(
- String catalog,
- String schemaPattern,
- String functionNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern),
- pat(functionNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String functionNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern),
+ pat(functionNamePattern)),
+ new QueryState(MetaDataOperation.GET_FUNCTIONS, catalog, schemaPattern,
+ functionNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getFunctionColumns(
- String catalog,
- String schemaPattern,
- String functionNamePattern,
- String columnNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getFunctionColumns(connection.handle, catalog, pat(schemaPattern),
- pat(functionNamePattern), pat(columnNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String functionNamePattern,
+ final String columnNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getFunctionColumns(connection.handle, catalog,
+ pat(schemaPattern), pat(functionNamePattern), pat(columnNamePattern)),
+ new QueryState(MetaDataOperation.GET_FUNCTION_COLUMNS, catalog,
+ schemaPattern, functionNamePattern, columnNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public ResultSet getPseudoColumns(
- String catalog,
- String schemaPattern,
- String tableNamePattern,
- String columnNamePattern) throws SQLException {
- return connection.createResultSet(
- connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern),
- pat(tableNamePattern), pat(columnNamePattern)));
+ final String catalog,
+ final String schemaPattern,
+ final String tableNamePattern,
+ final String columnNamePattern) throws SQLException {
+ try {
+ return connection.invokeWithRetries(
+ new CallableWithoutException<ResultSet>() {
+ public ResultSet call() {
+ try {
+ return connection.createResultSet(
+ connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern),
+ pat(tableNamePattern), pat(columnNamePattern)),
+ new QueryState(MetaDataOperation.GET_PSEUDO_COLUMNS, catalog, schemaPattern,
+ tableNamePattern, columnNamePattern));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ throw (SQLException) cause;
+ }
+ throw e;
+ }
}
public boolean generatedKeyAlwaysReturned() throws SQLException {
http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
index c6d2ede..1a2a97f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
@@ -51,13 +51,14 @@ public interface AvaticaFactory {
* {@link AvaticaResultSet#execute()} on it.
*
* @param statement Statement
+ * @param state The state used to create this result set
* @param signature Prepared statement
* @param timeZone Time zone
* @param firstFrame Frame containing the first (or perhaps only) rows in the
* result, or null if an execute/fetch is required
* @return Result set
*/
- AvaticaResultSet newResultSet(AvaticaStatement statement,
+ AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state,
Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame)
throws SQLException;