You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/10/29 01:46:24 UTC

[6/6] calcite git commit: [CALCITE-903] Enable Avatica client to recover from missing server-side state (Josh Elser)

[CALCITE-903] Enable Avatica client to recover from missing server-side state (Josh Elser)

Close apache/calcite#140


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/97df1acb
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/97df1acb
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/97df1acb

Branch: refs/heads/master
Commit: 97df1acbe82881f0fd196477dadbcbc173bddc20
Parents: 1a491b7
Author: Josh Elser <el...@apache.org>
Authored: Wed Sep 23 11:53:13 2015 -0400
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Oct 28 16:42:33 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   |  109 +-
 .../calcite/avatica/jdbc/JdbcResultSet.java     |   12 +-
 .../calcite/avatica/jdbc/StatementInfo.java     |  170 ++
 .../calcite/avatica/jdbc/StatementInfoTest.java |  138 +
 .../remote/AlternatingRemoteMetaTest.java       |  392 +++
 .../calcite/avatica/remote/RemoteMetaTest.java  |    6 +-
 .../calcite/avatica/AvaticaConnection.java      |   77 +-
 .../avatica/AvaticaDatabaseMetaData.java        |  789 ++++-
 .../apache/calcite/avatica/AvaticaFactory.java  |    3 +-
 .../calcite/avatica/AvaticaJdbc41Factory.java   |    4 +-
 .../avatica/AvaticaPreparedStatement.java       |   11 +-
 .../calcite/avatica/AvaticaResultSet.java       |    5 +-
 .../calcite/avatica/AvaticaStatement.java       |   40 +-
 .../java/org/apache/calcite/avatica/Meta.java   |   17 +-
 .../org/apache/calcite/avatica/MetaImpl.java    |   69 +-
 .../avatica/MissingResultsException.java        |   41 +
 .../avatica/NoSuchConnectionException.java      |   37 +
 .../avatica/NoSuchStatementException.java       |   39 +
 .../org/apache/calcite/avatica/QueryState.java  |  489 +++
 .../calcite/avatica/UnregisteredDriver.java     |    2 +-
 .../apache/calcite/avatica/proto/Common.java    | 2847 +++++++++++++++++-
 .../apache/calcite/avatica/proto/Requests.java  |  770 ++++-
 .../apache/calcite/avatica/proto/Responses.java |  769 ++++-
 .../calcite/avatica/remote/AbstractHandler.java |    5 +
 .../calcite/avatica/remote/AbstractService.java |    5 +-
 .../avatica/remote/AvaticaHttpClient.java       |   34 +
 .../avatica/remote/AvaticaHttpClientImpl.java   |   73 +
 .../apache/calcite/avatica/remote/Driver.java   |   65 +-
 .../apache/calcite/avatica/remote/Handler.java  |    4 +
 .../calcite/avatica/remote/JsonService.java     |    8 +
 .../calcite/avatica/remote/LocalService.java    |   99 +-
 .../avatica/remote/MetaDataOperation.java       |  181 ++
 .../calcite/avatica/remote/MockJsonService.java |    2 +-
 .../avatica/remote/MockProtobufService.java     |    2 +-
 .../calcite/avatica/remote/ProtobufService.java |    4 +
 .../avatica/remote/ProtobufTranslationImpl.java |    6 +
 .../calcite/avatica/remote/RemoteMeta.java      |  368 ++-
 .../avatica/remote/RemoteProtobufService.java   |   42 +-
 .../calcite/avatica/remote/RemoteService.java   |   45 +-
 .../apache/calcite/avatica/remote/Service.java  |  227 +-
 avatica/src/main/protobuf/common.proto          |   64 +
 avatica/src/main/protobuf/requests.proto        |    7 +
 avatica/src/main/protobuf/responses.proto       |    8 +
 .../calcite/avatica/AvaticaConnectionTest.java  |   60 +
 .../apache/calcite/avatica/QueryStateTest.java  |  513 ++++
 .../avatica/remote/AvaticaHttpClientTest.java   |   93 +
 .../avatica/remote/MetaDataOperationTest.java   |   37 +
 .../avatica/remote/ProtobufHandlerTest.java     |    2 +-
 .../remote/ProtobufTranslationImplTest.java     |   37 +-
 .../calcite/avatica/test/JsonHandlerTest.java   |    6 +-
 .../calcite/jdbc/CalciteJdbc41Factory.java      |    3 +-
 .../apache/calcite/jdbc/CalciteMetaImpl.java    |   20 +-
 .../apache/calcite/jdbc/CalciteResultSet.java   |    2 +-
 src/main/config/checkstyle/checker.xml          |    4 -
 54 files changed, 8302 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index b8e4ea4..1bfd7f6 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -23,6 +23,10 @@ import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.ConnectionPropertiesImpl;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchConnectionException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
 import org.apache.calcite.avatica.SqlType;
 import org.apache.calcite.avatica.remote.TypedValue;
 
@@ -51,7 +55,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -64,7 +67,7 @@ public class JdbcMeta implements Meta {
 
   private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache";
 
-  /** Special value for {@link Statement#getLargeMaxRows()} that means fetch
+  /** Special value for {@code Statement#getLargeMaxRows()} that means fetch
    * an unlimited number of rows in a single batch.
    *
    * <p>Any other negative value will return an unlimited number of rows but
@@ -290,7 +293,7 @@ public class JdbcMeta implements Meta {
   private int registerMetaStatement(ResultSet rs) throws SQLException {
     final int id = statementIdGenerator.getAndIncrement();
     StatementInfo statementInfo = new StatementInfo(rs.getStatement());
-    statementInfo.resultSet = rs;
+    statementInfo.setResultSet(rs);
     statementCache.put(id, statementInfo);
     return id;
   }
@@ -508,7 +511,7 @@ public class JdbcMeta implements Meta {
     return null;
   }
 
-  public Iterable<Object> createIterable(StatementHandle handle,
+  public Iterable<Object> createIterable(StatementHandle handle, QueryState state,
       Signature signature, List<TypedValue> parameterValues, Frame firstFrame) {
     return null;
   }
@@ -519,7 +522,8 @@ public class JdbcMeta implements Meta {
     }
     Connection conn = connectionCache.getIfPresent(id);
     if (conn == null) {
-      throw new RuntimeException("Connection not found: invalid id, closed, or expired: " + id);
+      throw new NoSuchConnectionException("Connection not found: invalid id, closed, or expired: "
+          + id);
     }
     return conn;
   }
@@ -550,8 +554,9 @@ public class JdbcMeta implements Meta {
       LOG.trace("closing statement " + h);
     }
     try {
-      if (info.resultSet != null) {
-        info.resultSet.close();
+      ResultSet results = info.getResultSet();
+      if (info.isResultSetInitialized() && null != results) {
+        results.close();
       }
       info.statement.close();
     } catch (SQLException e) {
@@ -674,12 +679,11 @@ public class JdbcMeta implements Meta {
   }
 
   public ExecuteResult prepareAndExecute(StatementHandle h, String sql,
-      long maxRowCount, PrepareCallback callback) {
+      long maxRowCount, PrepareCallback callback) throws NoSuchStatementException {
     try {
       final StatementInfo info = statementCache.getIfPresent(h.id);
       if (info == null) {
-        throw new RuntimeException("Statement not found, potentially expired. "
-            + h);
+        throw new NoSuchStatementException(h);
       }
       final Statement statement = info.statement;
       // Special handling of maxRowCount as JDBC 0 is unlimited, our meta 0 row
@@ -689,18 +693,18 @@ public class JdbcMeta implements Meta {
         statement.setMaxRows(0);
       }
       boolean ret = statement.execute(sql);
-      info.resultSet = statement.getResultSet();
-      assert ret || info.resultSet == null;
+      info.setResultSet(statement.getResultSet());
+      // Either execute(sql) returned true or the resultSet was null
+      assert ret || null == info.getResultSet();
       final List<MetaResultSet> resultSets = new ArrayList<>();
-      if (info.resultSet == null) {
+      if (null == info.getResultSet()) {
         // Create a special result set that just carries update count
         resultSets.add(
             JdbcResultSet.count(h.connectionId, h.id,
                 AvaticaUtils.getLargeUpdateCount(statement)));
       } else {
         resultSets.add(
-            JdbcResultSet.create(h.connectionId, h.id, info.resultSet,
-                maxRowCount));
+            JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), maxRowCount));
       }
       if (LOG.isTraceEnabled()) {
         LOG.trace("prepAndExec statement " + h);
@@ -712,19 +716,51 @@ public class JdbcMeta implements Meta {
     }
   }
 
-  public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) {
+  public boolean syncResults(StatementHandle sh, QueryState state, long offset)
+      throws NoSuchStatementException {
+    try {
+      final Connection conn = getConnection(sh.connectionId);
+      final StatementInfo info = statementCache.getIfPresent(sh.id);
+      if (null == info) {
+        throw new NoSuchStatementException(sh);
+      }
+      final Statement statement = info.statement;
+      // Let the state recreate the necessary ResultSet on the Statement
+      info.setResultSet(state.invoke(conn, statement));
+
+      if (null != info.getResultSet()) {
+        // If it is non-null, try to advance to the requested offset.
+        return info.advanceResultSetToOffset(info.getResultSet(), offset);
+      }
+
+      // No results, nothing to do. Client can move on.
+      return false;
+    } catch (SQLException e) {
+      throw propagate(e);
+    }
+  }
+
+  public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws
+      NoSuchStatementException, MissingResultsException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:"
           + fetchMaxRowCount);
     }
     try {
-      final StatementInfo statementInfo = Objects.requireNonNull(
-          statementCache.getIfPresent(h.id),
-          "Statement not found, potentially expired. " + h);
-      if (statementInfo.resultSet == null) {
+      final StatementInfo statementInfo = statementCache.getIfPresent(h.id);
+      if (null == statementInfo) {
+        // Statement might have expired, or never existed on this server.
+        throw new NoSuchStatementException(h);
+      }
+
+      if (!statementInfo.isResultSetInitialized()) {
+        // The Statement exists, but the results are missing. Need to call syncResults(...)
+        throw new MissingResultsException(h);
+      }
+      if (statementInfo.getResultSet() == null) {
         return Frame.EMPTY;
       } else {
-        return JdbcResultSet.frame(statementInfo.resultSet, offset,
+        return JdbcResultSet.frame(statementInfo, statementInfo.getResultSet(), offset,
             fetchMaxRowCount, calendar);
       }
     } catch (SQLException e) {
@@ -740,15 +776,16 @@ public class JdbcMeta implements Meta {
   }
 
   @Override public ExecuteResult execute(StatementHandle h,
-      List<TypedValue> parameterValues, long maxRowCount) {
+      List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException {
     try {
       if (MetaImpl.checkParameterValueHasNull(parameterValues)) {
         throw new SQLException("exception while executing query: unbound parameter");
       }
 
-      final StatementInfo statementInfo = Objects.requireNonNull(
-          statementCache.getIfPresent(h.id),
-          "Statement not found, potentially expired. " + h);
+      final StatementInfo statementInfo = statementCache.getIfPresent(h.id);
+      if (null == statementInfo) {
+        throw new NoSuchStatementException(h);
+      }
       final List<MetaResultSet> resultSets = new ArrayList<>();
       final PreparedStatement preparedStatement =
           (PreparedStatement) statementInfo.statement;
@@ -772,14 +809,16 @@ public class JdbcMeta implements Meta {
           signature2 = h.signature;
         }
 
-        statementInfo.resultSet = preparedStatement.getResultSet();
-        if (statementInfo.resultSet == null) {
+        // Make sure we set this for subsequent fetch()'s to find the result set.
+        statementInfo.setResultSet(preparedStatement.getResultSet());
+
+        if (statementInfo.getResultSet() == null) {
           frame = Frame.EMPTY;
           resultSets.add(JdbcResultSet.empty(h.connectionId, h.id, signature2));
         } else {
           resultSets.add(
               JdbcResultSet.create(h.connectionId, h.id,
-                  statementInfo.resultSet, maxRowCount, signature2));
+                  statementInfo.getResultSet(), maxRowCount, signature2));
         }
       } else {
         resultSets.add(
@@ -793,16 +832,6 @@ public class JdbcMeta implements Meta {
     }
   }
 
-  /** All we know about a statement. */
-  private static class StatementInfo {
-    final Statement statement; // sometimes a PreparedStatement
-    ResultSet resultSet;
-
-    private StatementInfo(Statement statement) {
-      this.statement = Objects.requireNonNull(statement);
-    }
-  }
-
   /** Configurable statement cache settings. */
   public enum StatementCacheSettings {
     /** JDBC connection property for setting connection cache concurrency level. */
@@ -917,8 +946,8 @@ public class JdbcMeta implements Meta {
                 + notification.getCause());
       }
       try {
-        if (doomed.resultSet != null) {
-          doomed.resultSet.close();
+        if (doomed.getResultSet() != null) {
+          doomed.getResultSet().close();
         }
         if (doomed.statement != null) {
           doomed.statement.close();

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index 30ee7f4..6630124 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -88,7 +88,7 @@ class JdbcResultSet extends Meta.MetaResultSet {
       } else {
         fetchRowCount = (int) maxRowCount;
       }
-      final Meta.Frame firstFrame = frame(resultSet, 0, fetchRowCount, calendar);
+      final Meta.Frame firstFrame = frame(null, resultSet, 0, fetchRowCount, calendar);
       if (firstFrame.done) {
         resultSet.close();
       }
@@ -114,7 +114,7 @@ class JdbcResultSet extends Meta.MetaResultSet {
 
   /** Creates a frame containing a given number or unlimited number of rows
    * from a result set. */
-  static Meta.Frame frame(ResultSet resultSet, long offset,
+  static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset,
       int fetchMaxRowCount, Calendar calendar) throws SQLException {
     final ResultSetMetaData metaData = resultSet.getMetaData();
     final int columnCount = metaData.getColumnCount();
@@ -126,7 +126,13 @@ class JdbcResultSet extends Meta.MetaResultSet {
     // Meta prepare/prepareAndExecute 0 return 0 row and done
     boolean done = fetchMaxRowCount == 0;
     for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) {
-      if (!resultSet.next()) {
+      final boolean hasRow;
+      if (null != info) {
+        hasRow = info.next();
+      } else {
+        hasRow = resultSet.next();
+      }
+      if (!hasRow) {
         done = true;
         resultSet.close();
         break;

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
new file mode 100644
index 0000000..ff27d05
--- /dev/null
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.Objects;
+
+/**
+ * All we know about a statement. Encapsulates a {@link ResultSet}.
+ */
+public class StatementInfo {
+  private volatile Boolean relativeSupported = null;
+
+  final Statement statement; // sometimes a PreparedStatement
+  private ResultSet resultSet;
+  private long position = 0;
+
+  // True when setResultSet(ResultSet) is called to let us determine the difference between
+  // a null ResultSet (from an update) from the lack of a ResultSet.
+  private boolean resultsInitialized = false;
+
+  public StatementInfo(Statement statement) {
+    this.statement = Objects.requireNonNull(statement);
+  }
+
+  // Visible for testing
+  void setPosition(long position) {
+    this.position = position;
+  }
+
+  // Visible for testing
+  long getPosition() {
+    return this.position;
+  }
+
+  /**
+   * Set a ResultSet on this object.
+   *
+   * @param resultSet The current ResultSet
+   */
+  public void setResultSet(ResultSet resultSet) {
+    resultsInitialized = true;
+    this.resultSet = resultSet;
+  }
+
+  /**
+   * @return The {@link ResultSet} for this Statement, may be null.
+   */
+  public ResultSet getResultSet() {
+    return this.resultSet;
+  }
+
+  /**
+   * @return True if {@link #setResultSet(ResultSet)} was ever invoked.
+   */
+  public boolean isResultSetInitialized() {
+    return resultsInitialized;
+  }
+
+  /**
+   * @see ResultSet#next()
+   */
+  public boolean next() throws SQLException {
+    return _next(resultSet);
+  }
+
+  boolean _next(ResultSet results) throws SQLException {
+    boolean ret = results.next();
+    position++;
+    return ret;
+  }
+
+  /**
+   * Consumes <code>offset - position</code> elements from the {@link ResultSet}.
+   *
+   * @param offset The offset to advance to
+   * @return True if the resultSet was advanced to the current point, false if insufficient rows
+   *      were present to advance to the requested offset.
+   */
+  public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException {
+    if (offset < 0 || offset < position) {
+      throw new IllegalArgumentException("Offset should be "
+        + " non-negative and not less than the current position. " + offset + ", " + position);
+    }
+    if (position >= offset) {
+      return true;
+    }
+
+    if (null == relativeSupported) {
+      Boolean moreResults = null;
+      synchronized (this) {
+        if (null == relativeSupported) {
+          try {
+            moreResults = advanceByRelative(results, offset);
+            relativeSupported = true;
+          } catch (SQLFeatureNotSupportedException e) {
+            relativeSupported = false;
+          }
+        }
+      }
+
+      if (null != moreResults) {
+        // We figured out whether or not relative is supported.
+        // Make sure we actually do the necessary work.
+        if (!relativeSupported) {
+          // We avoided calling advanceByNext in the synchronized block earlier.
+          moreResults = advanceByNext(results, offset);
+        }
+
+        return moreResults;
+      }
+
+      // Another thread updated the RELATIVE_SUPPORTED before we did, fall through.
+    }
+
+    if (relativeSupported) {
+      return advanceByRelative(results, offset);
+    } else {
+      return advanceByNext(results, offset);
+    }
+  }
+
+  private boolean advanceByRelative(ResultSet results, long offset) throws SQLException {
+    long diff = offset - position;
+    while (diff > Integer.MAX_VALUE) {
+      if (!results.relative(Integer.MAX_VALUE)) {
+        // Avoid updating position until relative succeeds.
+        position += Integer.MAX_VALUE;
+        return false;
+      }
+      // Avoid updating position until relative succeeds.
+      position += Integer.MAX_VALUE;
+      diff -= Integer.MAX_VALUE;
+    }
+    boolean ret = results.relative((int) diff);
+    // Make sure we only update the position after successfully calling relative(int).
+    position += diff;
+    return ret;
+  }
+
+  private boolean advanceByNext(ResultSet results, long offset) throws SQLException {
+    while (position < offset) {
+      // Advance while maintaining `position`
+      if (!_next(results)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}
+
+// End StatementInfo.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
new file mode 100644
index 0000000..2984692
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.sql.ResultSet;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests covering {@link StatementInfo}.
+ */
+public class StatementInfoTest {
+
+  @Test
+  public void testLargeOffsets() throws Exception {
+    Statement stmt = Mockito.mock(Statement.class);
+    ResultSet results = Mockito.mock(ResultSet.class);
+
+    StatementInfo info = new StatementInfo(stmt);
+
+    Mockito.when(results.relative(Integer.MAX_VALUE)).thenReturn(true, true);
+    Mockito.when(results.relative(1)).thenReturn(true);
+
+    long offset = 1L + Integer.MAX_VALUE + Integer.MAX_VALUE;
+    assertTrue(info.advanceResultSetToOffset(results, offset));
+
+    InOrder inOrder = Mockito.inOrder(results);
+
+    inOrder.verify(results, Mockito.times(2)).relative(Integer.MAX_VALUE);
+    inOrder.verify(results).relative(1);
+
+    assertEquals(offset, info.getPosition());
+  }
+
+  @Test
+  public void testNextUpdatesPosition() throws Exception {
+    Statement stmt = Mockito.mock(Statement.class);
+    ResultSet results = Mockito.mock(ResultSet.class);
+
+    StatementInfo info = new StatementInfo(stmt);
+    info.setResultSet(results);
+
+    Mockito.when(results.next()).thenReturn(true, true, true, false);
+
+    for (int i = 0; i < 3; i++) {
+      assertTrue(i + "th call of next() should return true", info.next());
+      assertEquals(info.getPosition(), i + 1);
+    }
+
+    assertFalse("Expected last next() to return false", info.next());
+    assertEquals(info.getPosition(), 4L);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoMovement() throws Exception {
+    Statement stmt = Mockito.mock(Statement.class);
+    ResultSet results = Mockito.mock(ResultSet.class);
+
+    StatementInfo info = new StatementInfo(stmt);
+    info.setPosition(500);
+
+    info.advanceResultSetToOffset(results, 400);
+  }
+
+  @Test public void testResultSetGetter() throws Exception {
+    Statement stmt = Mockito.mock(Statement.class);
+    ResultSet results = Mockito.mock(ResultSet.class);
+
+    StatementInfo info = new StatementInfo(stmt);
+
+    assertFalse("ResultSet should not be initialized", info.isResultSetInitialized());
+    assertNull("ResultSet should be null", info.getResultSet());
+
+    info.setResultSet(results);
+
+    assertTrue("ResultSet should be initialized", info.isResultSetInitialized());
+    assertEquals(results, info.getResultSet());
+  }
+
+  @Test public void testCheckPositionAfterFailedRelative() throws Exception {
+    Statement stmt = Mockito.mock(Statement.class);
+    ResultSet results = Mockito.mock(ResultSet.class);
+    final long offset = 500;
+
+    StatementInfo info = new StatementInfo(stmt);
+    info.setResultSet(results);
+
+    // relative() doesn't work
+    Mockito.when(results.relative((int) offset)).thenThrow(new SQLFeatureNotSupportedException());
+    // Should fall back to next(), 500 calls to next, 1 false
+    Mockito.when(results.next()).then(new Answer<Boolean>() {
+      private long invocations = 0;
+
+      // Return true until 500, false after.
+      @Override public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        invocations++;
+        if (invocations >= offset) {
+          return false;
+        }
+        return true;
+      }
+    });
+
+    info.advanceResultSetToOffset(results, offset);
+
+    // Verify correct position
+    assertEquals(offset, info.getPosition());
+    // Make sure that we actually advanced the result set
+    Mockito.verify(results, Mockito.times(500)).next();
+  }
+}
+
+// End StatementInfoTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
new file mode 100644
index 0000000..df2862d
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.ConnectionConfig;
+import org.apache.calcite.avatica.ConnectionPropertiesImpl;
+import org.apache.calcite.avatica.ConnectionSpec;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.server.AvaticaHandler;
+import org.apache.calcite.avatica.server.HttpServer;
+import org.apache.calcite.avatica.server.Main;
+import org.apache.calcite.avatica.server.Main.HandlerFactory;
+
+import com.google.common.cache.Cache;
+
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that verify that the Driver still functions when requests are randomly bounced between
+ * more than one server.
+ */
+public class AlternatingRemoteMetaTest {
+  private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB;
+
+  private static String url;
+
+  static {
+    try {
+      DriverManager.registerDriver(new AlternatingDriver());
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Keep a reference to the servers we start to clean them up after
+  private static final List<HttpServer> ACTIVE_SERVERS = new ArrayList<>();
+
+  /** Factory that provides a {@link JdbcMeta}. */
+  public static class FullyRemoteJdbcMetaFactory implements Meta.Factory {
+
+    private static JdbcMeta instance = null;
+
+    private static JdbcMeta getInstance() {
+      if (instance == null) {
+        try {
+          instance = new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
+              CONNECTION_SPEC.password);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return instance;
+    }
+
+    @Override public Meta create(List<String> args) {
+      return getInstance();
+    }
+  }
+
+  /**
+   * AvaticaHttpClient implementation that randomly chooses among the provided URLs.
+   */
+  public static class AlternatingAvaticaHttpClient implements AvaticaHttpClient {
+    private final List<AvaticaHttpClientImpl> clients;
+    private final Random r = new Random();
+
+    public AlternatingAvaticaHttpClient(List<URL> urls) {
+      //System.out.println("Constructing clients for " + urls);
+      clients = new ArrayList<>(urls.size());
+      for (URL url : urls) {
+        clients.add(new AvaticaHttpClientImpl(url));
+      }
+    }
+
+    public byte[] send(byte[] request) {
+      AvaticaHttpClientImpl client = clients.get(r.nextInt(clients.size()));
+      //System.out.println("URL: " + client.url);
+      return client.send(request);
+    }
+
+  }
+
+  /**
+   * Driver implementation {@link AlternatingAvaticaHttpClient}.
+   */
+  public static class AlternatingDriver extends Driver {
+
+    public static final String PREFIX = "jdbc:avatica:remote-alternating:";
+
+    @Override protected String getConnectStringPrefix() {
+      return PREFIX;
+    }
+
+    @Override public Meta createMeta(AvaticaConnection connection) {
+      final ConnectionConfig config = connection.config();
+      return new RemoteMeta(connection, new RemoteService(getHttpClient(connection, config)));
+    }
+
+    @Override AvaticaHttpClient getHttpClient(AvaticaConnection connection,
+        ConnectionConfig config) {
+      return new AlternatingAvaticaHttpClient(parseUrls(config.url()));
+    }
+
+    List<URL> parseUrls(String urlStr) {
+      final List<URL> urls = new ArrayList<>();
+      final char comma = ',';
+
+      int prevIndex = 0;
+      int index = urlStr.indexOf(comma);
+      if (-1 == index) {
+        try {
+          return Collections.singletonList(new URL(urlStr));
+        } catch (MalformedURLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      // String split w/o regex
+      while (-1 != index) {
+        try {
+          urls.add(new URL(urlStr.substring(prevIndex, index)));
+        } catch (MalformedURLException e) {
+          throw new RuntimeException(e);
+        }
+        prevIndex = index + 1;
+        index = urlStr.indexOf(comma, prevIndex);
+      }
+
+      // Get the last one
+      try {
+        urls.add(new URL(urlStr.substring(prevIndex)));
+      } catch (MalformedURLException e) {
+        throw new RuntimeException(e);
+      }
+
+      return urls;
+    }
+
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    final String[] mainArgs = new String[] { FullyRemoteJdbcMetaFactory.class.getName() };
+
+    // Bind to '0' to pluck an ephemeral port instead of expecting a certain one to be free
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 2; i++) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      HttpServer jsonServer = Main.start(mainArgs, 0, new HandlerFactory() {
+        @Override public AbstractHandler createHandler(Service service) {
+          return new AvaticaHandler(service);
+        }
+      });
+      ACTIVE_SERVERS.add(jsonServer);
+      sb.append("http://localhost:").append(jsonServer.getPort());
+    }
+
+    url = AlternatingDriver.PREFIX + "url=" + sb.toString();
+  }
+
+  @AfterClass public static void afterClass() throws Exception {
+    for (HttpServer server : ACTIVE_SERVERS) {
+      if (server != null) {
+        server.stop();
+      }
+    }
+  }
+
+  private static Meta getMeta(AvaticaConnection conn) throws Exception {
+    Field f = AvaticaConnection.class.getDeclaredField("meta");
+    f.setAccessible(true);
+    return (Meta) f.get(conn);
+  }
+
+  private static Meta.ExecuteResult prepareAndExecuteInternal(AvaticaConnection conn,
+    final AvaticaStatement statement, String sql, int maxRowCount) throws Exception {
+    Method m =
+        AvaticaConnection.class.getDeclaredMethod("prepareAndExecuteInternal",
+            AvaticaStatement.class, String.class, long.class);
+    m.setAccessible(true);
+    return (Meta.ExecuteResult) m.invoke(conn, statement, sql, maxRowCount);
+  }
+
+  private static Connection getConnection(JdbcMeta m, String id) throws Exception {
+    Field f = JdbcMeta.class.getDeclaredField("connectionCache");
+    f.setAccessible(true);
+    //noinspection unchecked
+    Cache<String, Connection> connectionCache = (Cache<String, Connection>) f.get(m);
+    return connectionCache.getIfPresent(id);
+  }
+
+  @Test public void testRemoteExecuteMaxRowCount() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+      final AvaticaStatement statement = conn.createStatement();
+      prepareAndExecuteInternal(conn, statement,
+        "select * from (values ('a', 1), ('b', 2))", 0);
+      ResultSet rs = statement.getResultSet();
+      int count = 0;
+      while (rs.next()) {
+        count++;
+      }
+      assertEquals("Check maxRowCount=0 and ResultSets is 0 row", count, 0);
+      assertEquals("Check result set meta is still there",
+        rs.getMetaData().getColumnCount(), 2);
+      rs.close();
+      statement.close();
+      conn.close();
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-780">[CALCITE-780]
+   * HTTP error 413 when sending a long string to the Avatica server</a>. */
+  @Test public void testRemoteExecuteVeryLargeQuery() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try {
+      // Before the bug was fixed, a value over 7998 caused an HTTP 413.
+      // 16K bytes, I guess.
+      checkLargeQuery(8);
+      checkLargeQuery(240);
+      checkLargeQuery(8000);
+      checkLargeQuery(240000);
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
+  private void checkLargeQuery(int n) throws Exception {
+    try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+      final AvaticaStatement statement = conn.createStatement();
+      final String frenchDisko = "It said human existence is pointless\n"
+          + "As acts of rebellious solidarity\n"
+          + "Can bring sense in this world\n"
+          + "La resistance!\n";
+      final String sql = "select '"
+          + longString(frenchDisko, n)
+          + "' as s from (values 'x')";
+      prepareAndExecuteInternal(conn, statement, sql, -1);
+      ResultSet rs = statement.getResultSet();
+      int count = 0;
+      while (rs.next()) {
+        count++;
+      }
+      assertThat(count, is(1));
+      rs.close();
+      statement.close();
+      conn.close();
+    }
+  }
+
+  /** Creates a string of exactly {@code length} characters by concatenating
+   * {@code fragment}. */
+  private static String longString(String fragment, int length) {
+    assert fragment.length() > 0;
+    final StringBuilder buf = new StringBuilder();
+    while (buf.length() < length) {
+      buf.append(fragment);
+    }
+    buf.setLength(length);
+    return buf.toString();
+  }
+
+  @Test public void testRemoteConnectionProperties() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) {
+      String id = conn.id;
+      final Map<String, ConnectionPropertiesImpl> m = ((RemoteMeta) getMeta(conn)).propsMap;
+      assertFalse("remote connection map should start ignorant", m.containsKey(id));
+      // force creating a connection object on the remote side.
+      try (final Statement stmt = conn.createStatement()) {
+        assertTrue("creating a statement starts a local object.", m.containsKey(id));
+        assertTrue(stmt.execute("select count(1) from EMP"));
+      }
+      Connection remoteConn = getConnection(FullyRemoteJdbcMetaFactory.getInstance(), id);
+      final boolean defaultRO = remoteConn.isReadOnly();
+      final boolean defaultAutoCommit = remoteConn.getAutoCommit();
+      final String defaultCatalog = remoteConn.getCatalog();
+      final String defaultSchema = remoteConn.getSchema();
+      conn.setReadOnly(!defaultRO);
+      assertTrue("local changes dirty local state", m.get(id).isDirty());
+      assertEquals("remote connection has not been touched", defaultRO, remoteConn.isReadOnly());
+      conn.setAutoCommit(!defaultAutoCommit);
+      assertEquals("remote connection has not been touched",
+          defaultAutoCommit, remoteConn.getAutoCommit());
+
+      // further interaction with the connection will force a sync
+      try (final Statement stmt = conn.createStatement()) {
+        assertEquals(!defaultAutoCommit, remoteConn.getAutoCommit());
+        assertFalse("local values should be clean", m.get(id).isDirty());
+      }
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
+  @Test public void testQuery() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+        Statement statement = conn.createStatement()) {
+      assertFalse(statement.execute("SET SCHEMA \"SCOTT\""));
+      assertFalse(
+          statement.execute(
+              "CREATE TABLE \"FOO\"(\"KEY\" INTEGER NOT NULL, \"VALUE\" VARCHAR(10))"));
+      assertFalse(statement.execute("SET TABLE \"FOO\" READONLY FALSE"));
+
+      final int numRecords = 1000;
+      for (int i = 0; i < numRecords; i++) {
+        assertFalse(statement.execute("INSERT INTO \"FOO\" VALUES(" + i + ", '" + i + "')"));
+      }
+
+      // Make sure all the records are there that we expect
+      ResultSet results = statement.executeQuery("SELECT count(KEY) FROM FOO");
+      assertTrue(results.next());
+      assertEquals(1000, results.getInt(1));
+      assertFalse(results.next());
+
+      results = statement.executeQuery("SELECT KEY, VALUE FROM FOO ORDER BY KEY ASC");
+      for (int i = 0; i < numRecords; i++) {
+        assertTrue(results.next());
+        assertEquals(i, results.getInt(1));
+        assertEquals(Integer.toString(i), results.getString(2));
+      }
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
+  @Test public void testSingleUrlParsing() throws Exception {
+    AlternatingDriver d = new AlternatingDriver();
+    List<URL> urls = d.parseUrls("http://localhost:1234");
+    assertEquals(Arrays.asList(new URL("http://localhost:1234")), urls);
+  }
+
+  @Test public void testMultipleUrlParsing() throws Exception {
+    AlternatingDriver d = new AlternatingDriver();
+    List<URL> urls = d.parseUrls("http://localhost:1234,http://localhost:2345,"
+        + "http://localhost:3456");
+    List<URL> expectedUrls = Arrays.asList(new URL("http://localhost:1234"),
+        new URL("http://localhost:2345"), new URL("http://localhost:3456"));
+    assertEquals(expectedUrls, urls);
+  }
+}
+
+// End AlternatingRemoteMetaTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
index e4d8690..c68f3cf 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
@@ -35,6 +35,7 @@ import com.google.common.cache.Cache;
 
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.AfterClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -344,6 +345,7 @@ public class RemoteMetaTest {
     }
   }
 
+  @Ignore("[CALCITE-942] AvaticaConnection should fail-fast when closed.")
   @Test public void testRemoteConnectionClosing() throws Exception {
     AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
     // Verify connection is usable
@@ -354,9 +356,9 @@ public class RemoteMetaTest {
     try {
       conn.createStatement();
       fail("expected exception");
-    } catch (RuntimeException e) {
+    } catch (SQLException e) {
       assertThat(e.getMessage(),
-          containsString("Connection not found: invalid id, closed, or expired"));
+          containsString("Connection is closed"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index a8867c8..03f2aa1 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -16,7 +16,10 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
 import org.apache.calcite.avatica.Meta.MetaResultSet;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest;
 import org.apache.calcite.avatica.remote.TypedValue;
 
 import java.sql.Array;
@@ -57,6 +60,9 @@ public abstract class AvaticaConnection implements Connection {
    * the number of rows modified. */
   public static final String ROWCOUNT_COLUMN_NAME = "ROWCOUNT";
 
+  public static final String NUM_EXECUTE_RETRIES_KEY = "avatica.statement.retries";
+  public static final String NUM_EXECUTE_RETRIES_DEFAULT = "5";
+
   /** The name of the sole column returned by an EXPLAIN statement.
    *
    * <p>Actually Avatica does not care what this column is called, but here is
@@ -80,6 +86,7 @@ public abstract class AvaticaConnection implements Connection {
   public final Map<InternalProperty, Object> properties = new HashMap<>();
   public final Map<Integer, AvaticaStatement> statementMap =
       new ConcurrentHashMap<>();
+  protected final long maxRetriesPerExecute;
 
   /**
    * Creates an AvaticaConnection.
@@ -105,6 +112,14 @@ public abstract class AvaticaConnection implements Connection {
     this.meta = driver.createMeta(this);
     this.metaData = factory.newDatabaseMetaData(this);
     this.holdability = metaData.getResultSetHoldability();
+    this.maxRetriesPerExecute = getNumStatementRetries(info);
+  }
+
+  /** Computes the number of retries
+   * {@link #executeInternal(String)} should retry before failing. */
+  long getNumStatementRetries(Properties props) {
+    return Long.valueOf(Objects.requireNonNull(props)
+        .getProperty(NUM_EXECUTE_RETRIES_KEY, NUM_EXECUTE_RETRIES_DEFAULT));
   }
 
   /** Returns a view onto this connection's configuration properties. Code
@@ -116,6 +131,14 @@ public abstract class AvaticaConnection implements Connection {
     return new ConnectionConfigImpl(info);
   }
 
+  /**
+   * Opens the connection on the server.
+   */
+  public void openConnection() {
+    // Open the connection on the server
+    this.meta.openConnection(handle, OpenConnectionRequest.serializeProperties(info));
+  }
+
   // Connection methods
 
   public AvaticaStatement createStatement() throws SQLException {
@@ -412,11 +435,12 @@ public abstract class AvaticaConnection implements Connection {
    * @param statement     Statement
    * @param signature     Prepared query
    * @param firstFrame    First frame of rows, or null if we need to execute
+   * @param state         The state used to create the given result
    * @return Result set
    * @throws java.sql.SQLException if a database error occurs
    */
   protected ResultSet executeQueryInternal(AvaticaStatement statement,
-      Meta.Signature signature, Meta.Frame firstFrame) throws SQLException {
+      Meta.Signature signature, Meta.Frame firstFrame, QueryState state) throws SQLException {
     // Close the previous open result set, if there is one.
     Meta.Frame frame = firstFrame;
     Meta.Signature signature2 = signature;
@@ -453,8 +477,9 @@ public abstract class AvaticaConnection implements Connection {
       if (frame == null && signature2 == null && statement.updateCount != -1) {
         statement.openResultSet = null;
       } else {
+        // Duplicative SQL, for support non-prepared statements
         statement.openResultSet =
-            factory.newResultSet(statement, signature2, timeZone, frame);
+            factory.newResultSet(statement, state, signature2, timeZone, frame);
       }
     }
     // Release the monitor before executing, to give another thread the
@@ -502,8 +527,8 @@ public abstract class AvaticaConnection implements Connection {
   }
 
   protected Meta.ExecuteResult prepareAndExecuteInternal(
-      final AvaticaStatement statement, String sql, long maxRowCount)
-      throws SQLException {
+      final AvaticaStatement statement, final String sql, long maxRowCount)
+      throws SQLException, NoSuchStatementException {
     final Meta.PrepareCallback callback =
         new Meta.PrepareCallback() {
           public Object getMonitor() {
@@ -531,7 +556,7 @@ public abstract class AvaticaConnection implements Connection {
               statement.updateCount = updateCount;
             } else {
               final TimeZone timeZone = getTimeZone();
-              statement.openResultSet = factory.newResultSet(statement,
+              statement.openResultSet = factory.newResultSet(statement, new QueryState(sql),
                   signature, timeZone, firstFrame);
             }
           }
@@ -546,13 +571,13 @@ public abstract class AvaticaConnection implements Connection {
     return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback);
   }
 
-  protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
+  protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState state)
       throws SQLException {
     final Meta.StatementHandle h = new Meta.StatementHandle(
         metaResultSet.connectionId, metaResultSet.statementId, null);
     final AvaticaStatement statement = lookupStatement(h);
     ResultSet resultSet = executeQueryInternal(statement, metaResultSet.signature.sanitize(),
-        metaResultSet.firstFrame);
+        metaResultSet.firstFrame, state);
     if (metaResultSet.ownStatement) {
       resultSet.getStatement().closeOnCompletion();
     }
@@ -617,6 +642,44 @@ public abstract class AvaticaConnection implements Connection {
       return connection.meta;
     }
   }
+
+  /**
+   * A Callable-like interface but without a "throws Exception".
+   *
+   * @param <T> The return type from {@code call}.
+   */
+  public interface CallableWithoutException<T> {
+    T call();
+  }
+
+  /**
+   * Invokes the given "callable", retrying the call when the server responds with an error
+   * denoting that the connection is missing on the server.
+   *
+   * @param callable The function to invoke.
+   * @return The value from the result of the callable.
+   */
+  public <T> T invokeWithRetries(CallableWithoutException<T> callable) {
+    RuntimeException lastException = null;
+    for (int i = 0; i < maxRetriesPerExecute; i++) {
+      try {
+        return callable.call();
+      } catch (AvaticaClientRuntimeException e) {
+        lastException = e;
+        if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()) {
+          this.openConnection();
+          continue;
+        }
+        throw e;
+      }
+    }
+    if (null != lastException) {
+      throw lastException;
+    } else {
+      // Shouldn't ever happen.
+      throw new IllegalStateException();
+    }
+  }
 }
 
 // End AvaticaConnection.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
index 27a2687..b57f36c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
+import org.apache.calcite.avatica.remote.MetaDataOperation;
 import org.apache.calcite.avatica.util.Casing;
 import org.apache.calcite.avatica.util.Quoting;
 
@@ -182,28 +184,53 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public String getSQLKeywords() throws SQLException {
-    return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS
-        .getProp(connection.meta, connection.handle, String.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<String>() {
+          public String call() {
+            return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS
+                .getProp(connection.meta, connection.handle, String.class);
+          }
+        });
   }
 
   public String getNumericFunctions() throws SQLException {
-    return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS
-        .getProp(connection.meta, connection.handle, String.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<String>() {
+          public String call() {
+            return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS
+                .getProp(connection.meta, connection.handle, String.class);
+          }
+        });
   }
 
   public String getStringFunctions() throws SQLException {
-    return Meta.DatabaseProperty.GET_STRING_FUNCTIONS
-        .getProp(connection.meta, connection.handle, String.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<String>() {
+          public String call() {
+            return Meta.DatabaseProperty.GET_STRING_FUNCTIONS
+                .getProp(connection.meta, connection.handle, String.class);
+          }
+        });
   }
 
   public String getSystemFunctions() throws SQLException {
-    return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS
-        .getProp(connection.meta, connection.handle, String.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<String>() {
+          public String call() {
+            return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS
+                .getProp(connection.meta, connection.handle, String.class);
+          }
+        });
   }
 
   public String getTimeDateFunctions() throws SQLException {
-    return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS
-        .getProp(connection.meta, connection.handle, String.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<String>() {
+          public String call() {
+            return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS
+                .getProp(connection.meta, connection.handle, String.class);
+          }
+        });
   }
 
   public String getSearchStringEscape() throws SQLException {
@@ -234,8 +261,7 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
     return true;
   }
 
-  public boolean supportsConvert(
-      int fromType, int toType) throws SQLException {
+  public boolean supportsConvert(int fromType, int toType) throws SQLException {
     return false; // TODO: more detail
   }
 
@@ -528,8 +554,13 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public int getDefaultTransactionIsolation() throws SQLException {
-    return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION
-        .getProp(connection.meta, connection.handle, Integer.class);
+    return connection.invokeWithRetries(
+        new CallableWithoutException<Integer>() {
+          public Integer call() {
+            return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION
+                .getProp(connection.meta, connection.handle, Integer.class);
+          }
+        });
   }
 
   public boolean supportsTransactions() throws SQLException {
@@ -560,33 +591,90 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getProcedures(
-      String catalog,
-      String schemaPattern,
-      String procedureNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern),
-            pat(procedureNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String procedureNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern),
+                        pat(procedureNamePattern)),
+                    new QueryState(MetaDataOperation.GET_PROCEDURES, catalog, schemaPattern,
+                        procedureNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getProcedureColumns(
-      String catalog,
-      String schemaPattern,
-      String procedureNamePattern,
-      String columnNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getProcedureColumns(connection.handle, catalog, pat(schemaPattern),
-            pat(procedureNamePattern), pat(columnNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String procedureNamePattern,
+      final String columnNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getProcedureColumns(connection.handle, catalog,
+                        pat(schemaPattern), pat(procedureNamePattern), pat(columnNamePattern)),
+                    new QueryState(MetaDataOperation.GET_PROCEDURE_COLUMNS, catalog, schemaPattern,
+                        procedureNamePattern, columnNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getTables(
-      String catalog,
+      final String catalog,
       final String schemaPattern,
-      String tableNamePattern,
-      String[] types) throws SQLException {
-    List<String> typeList = types == null ? null : Arrays.asList(types);
-    return connection.createResultSet(
-        connection.meta.getTables(connection.handle, catalog, pat(schemaPattern),
-            pat(tableNamePattern), typeList));
+      final String tableNamePattern,
+      final String[] types) throws SQLException {
+    final List<String> typeList = types == null ? null : Arrays.asList(types);
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getTables(connection.handle, catalog, pat(schemaPattern),
+                        pat(tableNamePattern), typeList),
+                    new QueryState(MetaDataOperation.GET_TABLES, catalog, schemaPattern,
+                        tableNamePattern, types));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   private static Meta.Pat pat(String schemaPattern) {
@@ -594,11 +682,30 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getSchemas(
-      String catalog, String schemaPattern) throws SQLException {
+      final String catalog, final String schemaPattern) throws SQLException {
     // TODO: add a 'catch ... throw new SQLException' logic to this and other
     // getXxx methods. Right now any error will throw a RuntimeException
-    return connection.createResultSet(
-        connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern)));
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern)),
+                    new QueryState(MetaDataOperation.GET_SCHEMAS_WITH_ARGS, catalog,
+                        schemaPattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getSchemas() throws SQLException {
@@ -606,103 +713,342 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getCatalogs() throws SQLException {
-    return connection.createResultSet(connection.meta.getCatalogs(connection.handle));
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(connection.meta.getCatalogs(connection.handle),
+                    new QueryState(MetaDataOperation.GET_CATALOGS));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getTableTypes() throws SQLException {
-    return connection.createResultSet(connection.meta.getTableTypes(connection.handle));
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(connection.meta.getTableTypes(connection.handle),
+                    new QueryState(MetaDataOperation.GET_TABLE_TYPES));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getColumns(
-      String catalog,
-      String schemaPattern,
-      String tableNamePattern,
-      String columnNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getColumns(connection.handle,
-            catalog, pat(schemaPattern),
-            pat(tableNamePattern), pat(columnNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String tableNamePattern,
+      final String columnNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getColumns(connection.handle, catalog, pat(schemaPattern),
+                        pat(tableNamePattern), pat(columnNamePattern)),
+                    new QueryState(MetaDataOperation.GET_COLUMNS, catalog, schemaPattern,
+                        tableNamePattern, columnNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getColumnPrivileges(
-      String catalog,
-      String schema,
-      String table,
-      String columnNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table,
-            pat(columnNamePattern)));
+      final String catalog,
+      final String schema,
+      final String table,
+      final String columnNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table,
+                        pat(columnNamePattern)),
+                    new QueryState(MetaDataOperation.GET_COLUMN_PRIVILEGES, catalog, schema, table,
+                        columnNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getTablePrivileges(
-      String catalog,
-      String schemaPattern,
-      String tableNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getTablePrivileges(connection.handle, catalog, pat(schemaPattern),
-            pat(tableNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String tableNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getTablePrivileges(connection.handle, catalog,
+                        pat(schemaPattern), pat(tableNamePattern)),
+                    new QueryState(MetaDataOperation.GET_TABLE_PRIVILEGES, catalog, schemaPattern,
+                        tableNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getBestRowIdentifier(
-      String catalog,
-      String schema,
-      String table,
-      int scope,
-      boolean nullable) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table, scope,
-            nullable));
+      final String catalog,
+      final String schema,
+      final String table,
+      final int scope,
+      final boolean nullable) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table,
+                        scope, nullable),
+                    new QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, catalog, table, scope,
+                        nullable));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getVersionColumns(
-      String catalog, String schema, String table) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getVersionColumns(connection.handle, catalog, schema, table));
+      final String catalog, final String schema, final String table) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getVersionColumns(connection.handle, catalog, schema, table),
+                    new QueryState(MetaDataOperation.GET_VERSION_COLUMNS, catalog, schema, table));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getPrimaryKeys(
-      String catalog, String schema, String table) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table));
+      final String catalog, final String schema, final String table) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table),
+                    new QueryState(MetaDataOperation.GET_PRIMARY_KEYS, catalog, schema, table));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getImportedKeys(
-      String catalog, String schema, String table) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getImportedKeys(connection.handle, catalog, schema, table));
+      final String catalog, final String schema, final String table) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getImportedKeys(connection.handle, catalog, schema, table),
+                    new QueryState(MetaDataOperation.GET_IMPORTED_KEYS, catalog, schema, table));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getExportedKeys(
-      String catalog, String schema, String table) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getExportedKeys(connection.handle, catalog, schema, table));
+      final String catalog, final String schema, final String table) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getExportedKeys(connection.handle, catalog, schema, table),
+                    new QueryState(MetaDataOperation.GET_EXPORTED_KEYS, catalog, schema, table));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getCrossReference(
-      String parentCatalog,
-      String parentSchema,
-      String parentTable,
-      String foreignCatalog,
-      String foreignSchema,
-      String foreignTable) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getCrossReference(connection.handle, parentCatalog, parentSchema,
-            parentTable, foreignCatalog, foreignSchema, foreignTable));
+      final String parentCatalog,
+      final String parentSchema,
+      final String parentTable,
+      final String foreignCatalog,
+      final String foreignSchema,
+      final String foreignTable) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getCrossReference(connection.handle, parentCatalog,
+                        parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable),
+                    new QueryState(MetaDataOperation.GET_CROSS_REFERENCE, parentCatalog,
+                        parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getTypeInfo() throws SQLException {
-    return connection.createResultSet(connection.meta.getTypeInfo(connection.handle));
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(connection.meta.getTypeInfo(connection.handle),
+                    new QueryState(MetaDataOperation.GET_TYPE_INFO));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getIndexInfo(
-      String catalog,
-      String schema,
-      String table,
-      boolean unique,
-      boolean approximate) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique,
-            approximate));
+      final String catalog,
+      final String schema,
+      final String table,
+      final boolean unique,
+      final boolean approximate) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique,
+                        approximate),
+                    new QueryState(MetaDataOperation.GET_INDEX_INFO, catalog, schema, table, unique,
+                        approximate));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public boolean supportsResultSetType(int type) throws SQLException {
@@ -756,13 +1102,32 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getUDTs(
-      String catalog,
-      String schemaPattern,
-      String typeNamePattern,
-      int[] types) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern),
-            pat(typeNamePattern), types));
+      final String catalog,
+      final String schemaPattern,
+      final String typeNamePattern,
+      final int[] types) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern),
+                        pat(typeNamePattern), types),
+                    new QueryState(MetaDataOperation.GET_UDTS, catalog, schemaPattern,
+                        typeNamePattern, types));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public Connection getConnection() throws SQLException {
@@ -786,31 +1151,88 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getSuperTypes(
-      String catalog,
-      String schemaPattern,
-      String typeNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern),
-            pat(typeNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String typeNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern),
+                        pat(typeNamePattern)),
+                    new QueryState(MetaDataOperation.GET_SUPER_TYPES, catalog, schemaPattern,
+                        typeNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getSuperTables(
-      String catalog,
-      String schemaPattern,
-      String tableNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern),
-            pat(tableNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String tableNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern),
+                        pat(tableNamePattern)),
+                    new QueryState(MetaDataOperation.GET_SUPER_TABLES, catalog, schemaPattern,
+                        tableNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getAttributes(
-      String catalog,
-      String schemaPattern,
-      String typeNamePattern,
-      String attributeNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern),
-            pat(typeNamePattern), pat(attributeNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String typeNamePattern,
+      final String attributeNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern),
+                        pat(typeNamePattern), pat(attributeNamePattern)),
+                    new QueryState(MetaDataOperation.GET_ATTRIBUTES, catalog, schemaPattern,
+                        typeNamePattern, attributeNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public boolean supportsResultSetHoldability(int holdability)
@@ -864,37 +1286,112 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData {
   }
 
   public ResultSet getClientInfoProperties() throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getClientInfoProperties(connection.handle));
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getClientInfoProperties(connection.handle),
+                    new QueryState(MetaDataOperation.GET_CLIENT_INFO_PROPERTIES));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getFunctions(
-      String catalog,
-      String schemaPattern,
-      String functionNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern),
-            pat(functionNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String functionNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern),
+                        pat(functionNamePattern)),
+                    new QueryState(MetaDataOperation.GET_FUNCTIONS, catalog, schemaPattern,
+                        functionNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getFunctionColumns(
-      String catalog,
-      String schemaPattern,
-      String functionNamePattern,
-      String columnNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getFunctionColumns(connection.handle, catalog, pat(schemaPattern),
-            pat(functionNamePattern), pat(columnNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String functionNamePattern,
+      final String columnNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getFunctionColumns(connection.handle, catalog,
+                        pat(schemaPattern), pat(functionNamePattern), pat(columnNamePattern)),
+                    new QueryState(MetaDataOperation.GET_FUNCTION_COLUMNS, catalog,
+                        schemaPattern, functionNamePattern, columnNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public ResultSet getPseudoColumns(
-      String catalog,
-      String schemaPattern,
-      String tableNamePattern,
-      String columnNamePattern) throws SQLException {
-    return connection.createResultSet(
-        connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern),
-            pat(tableNamePattern), pat(columnNamePattern)));
+      final String catalog,
+      final String schemaPattern,
+      final String tableNamePattern,
+      final String columnNamePattern) throws SQLException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ResultSet>() {
+            public ResultSet call() {
+              try {
+                return connection.createResultSet(
+                    connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern),
+                        pat(tableNamePattern), pat(columnNamePattern)),
+                    new QueryState(MetaDataOperation.GET_PSEUDO_COLUMNS, catalog, schemaPattern,
+                        tableNamePattern, columnNamePattern));
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SQLException) {
+        throw (SQLException) cause;
+      }
+      throw e;
+    }
   }
 
   public boolean generatedKeyAlwaysReturned() throws SQLException {

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
index c6d2ede..1a2a97f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java
@@ -51,13 +51,14 @@ public interface AvaticaFactory {
    * {@link AvaticaResultSet#execute()} on it.
    *
    * @param statement Statement
+   * @param state The state used to create this result set
    * @param signature Prepared statement
    * @param timeZone Time zone
    * @param firstFrame Frame containing the first (or perhaps only) rows in the
    *                   result, or null if an execute/fetch is required
    * @return Result set
    */
-  AvaticaResultSet newResultSet(AvaticaStatement statement,
+  AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state,
       Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame)
       throws SQLException;