You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/03/28 22:23:50 UTC

[01/10] incubator-calcite git commit: [CALCITE-648] Update ProjectMergeRule description for new naming convention (Jinfeng Ni)

Repository: incubator-calcite
Updated Branches:
  refs/heads/master 9e0be8bca -> e2833a297


[CALCITE-648] Update ProjectMergeRule description for new naming convention (Jinfeng Ni)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/a24b3c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/a24b3c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/a24b3c1f

Branch: refs/heads/master
Commit: a24b3c1f0877db1da64108af9bceb3e7023dc049
Parents: 9e0be8b
Author: Julian Hyde <jh...@apache.org>
Authored: Thu Mar 26 19:35:34 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Mar 26 19:35:34 2015 -0700

----------------------------------------------------------------------
 .../calcite/rel/rules/ProjectMergeRule.java     | 23 ++++++++++----------
 .../org/apache/calcite/tools/PlannerTest.java   | 17 +++++++++++++--
 2 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a24b3c1f/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
index 96559cf..cef654e 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
@@ -62,18 +62,17 @@ public class ProjectMergeRule extends RelOptRule {
    *
    * @param force Whether to always merge projects
    */
-  public ProjectMergeRule(boolean force, ProjectFactory pFactory) {
+  public ProjectMergeRule(boolean force, ProjectFactory projectFactory) {
     super(
         operand(Project.class,
             operand(Project.class, any())),
-        "ProjectMergeRule" + (force ? ": force mode" : ""));
+             "ProjectMergeRule" + (force ? ":force_mode" : ""));
     this.force = force;
-    projectFactory = pFactory;
+    this.projectFactory = projectFactory;
   }
 
   //~ Methods ----------------------------------------------------------------
 
-  // implement RelOptRule
   public void onMatch(RelOptRuleCall call) {
     Project topProject = call.rel(0);
     Project bottomProject = call.rel(1);
@@ -121,11 +120,11 @@ public class ProjectMergeRule extends RelOptRule {
             rexBuilder);
 
     // create a RexProgram for the topmost project
-    List<RexNode> projExprs = topProject.getProjects();
+    final List<RexNode> projects = topProject.getProjects();
     RexProgram topProgram =
         RexProgram.create(
             bottomProject.getRowType(),
-            projExprs,
+            projects,
             null,
             topProject.getRowType(),
             rexBuilder);
@@ -139,16 +138,16 @@ public class ProjectMergeRule extends RelOptRule {
 
     // re-expand the topmost projection expressions, now that they
     // reference the children of the bottom-most project
-    int nProjExprs = projExprs.size();
-    List<RexNode> newProjExprs = new ArrayList<RexNode>();
-    List<RexLocalRef> projList = mergedProgram.getProjectList();
-    for (int i = 0; i < nProjExprs; i++) {
-      newProjExprs.add(mergedProgram.expandLocalRef(projList.get(i)));
+    final int projectCount = projects.size();
+    final List<RexNode> newProjects = new ArrayList<>();
+    List<RexLocalRef> projectRefs = mergedProgram.getProjectList();
+    for (int i = 0; i < projectCount; i++) {
+      newProjects.add(mergedProgram.expandLocalRef(projectRefs.get(i)));
     }
 
     // replace the two projects with a combined projection
     RelNode newProjectRel = projectFactory.createProject(
-        bottomProject.getInput(), newProjExprs,
+        bottomProject.getInput(), newProjects,
         topProject.getRowType().getFieldNames());
 
     call.transformTo(newProjectRel);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a24b3c1f/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
index 55f287c..ff6094a 100644
--- a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
@@ -38,9 +38,11 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
 import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
@@ -473,7 +475,7 @@ public class PlannerTest {
             FilterMergeRule.INSTANCE,
             EnumerableRules.ENUMERABLE_FILTER_RULE,
             EnumerableRules.ENUMERABLE_PROJECT_RULE);
-    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
     traitDefs.add(RelCollationTraitDef.INSTANCE);
 
@@ -979,7 +981,7 @@ public class PlannerTest {
       + "order by ps.psPartkey, ps.psSupplyCost) t \n"
       + "order by t.psPartkey";
 
-    List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
     traitDefs.add(RelCollationTraitDef.INSTANCE);
     final SqlParser.Config parserConfig =
@@ -1005,6 +1007,17 @@ public class PlannerTest {
         + "        LogicalProject(psPartkey=[$0], psSupplyCost=[$1])\n"
         + "          EnumerableTableScan(table=[[tpch, partsupp]])\n"));
   }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-648">[CALCITE-649]
+   * Update ProjectMergeRule description for new naming convention</a>. */
+  @Test public void testMergeProjectForceMode() throws Exception {
+    RuleSet ruleSet =
+        RuleSets.ofList(
+            new ProjectMergeRule(true, RelFactories.DEFAULT_PROJECT_FACTORY));
+    Planner planner = getPlanner(null, Programs.of(ruleSet));
+    planner.close();
+  }
 }
 
 // End PlannerTest.java


[07/10] incubator-calcite git commit: [CALCITE-640] Avatica server should expire stale connections/statements (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-640] Avatica server should expire stale connections/statements (Nick Dimiduk)

Close apache/incubator-calcite#65


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0f824f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0f824f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0f824f81

Branch: refs/heads/master
Commit: 0f824f819234c5d08aac88f73ba50894b8b23a54
Parents: c446e02
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:25:00 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/calcite/avatica/server/Main.java |  14 +-
 avatica/pom.xml                                 |   4 +
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 198 ++++++++++++++++---
 .../calcite/avatica/remote/JsonService.java     |   2 -
 .../calcite/avatica/remote/LocalService.java    |   7 +-
 .../calcite/avatica/remote/RemoteMeta.java      |   4 +-
 .../calcite/avatica/test/RemoteDriverTest.java  |  39 ++--
 7 files changed, 217 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
index d3a24f0..f8416d6 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -36,10 +36,16 @@ public class Main {
   }
 
   /**
-   * Create and start an {@link HttpServer}.
-   * @param args <br />&nbsp;&nbsp;args[0]: the {@link Meta.Factory} class name.<br />
-   *             &nbsp;&nbsp;args[1:]: arguments passed along to
-   *             {@link Meta.Factory#create(java.util.List)}
+   * Creates and starts an {@link HttpServer}.
+   *
+   * <p>Arguments are as follows:
+   * <ul>
+   *   <li>args[0]: the {@link Meta.Factory} class name
+   *   <li>args[1+]: arguments passed along to
+   *   {@link Meta.Factory#create(java.util.List)}
+   * </ul>
+   *
+   * @param args Command-line arguments
    */
   public static HttpServer start(String[] args)
       throws ClassNotFoundException, InstantiationException,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 0e58323..0bacc4a 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 24bdbda..627a0d7 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -23,6 +23,11 @@ import org.apache.calcite.avatica.Meta;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 import java.lang.reflect.Array;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
@@ -43,6 +48,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 /** Implementation of {@link Meta} upon an existing JDBC data source. */
 public class JdbcMeta implements Meta {
@@ -81,14 +87,64 @@ public class JdbcMeta implements Meta {
     SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
   }
 
+  //
+  // Constants for connection cache settings.
+  //
+
+  private static final String CONN_CACHE_KEY_BASE = "avatica.connectioncache";
+  /** JDBC connection property for setting connection cache concurrency level. */
+  public static final String CONN_CACHE_CONCURRENCY_KEY =
+      CONN_CACHE_KEY_BASE + ".concurrency";
+  public static final String DEFAULT_CONN_CACHE_CONCURRENCY_LEVEL = "10";
+  /** JDBC connection property for setting connection cache initial capacity. */
+  public static final String CONN_CACHE_INITIAL_CAPACITY_KEY =
+      CONN_CACHE_KEY_BASE + ".initialcapacity";
+  public static final String DEFAULT_CONN_CACHE_INITIAL_CAPACITY = "100";
+  /** JDBC connection property for setting connection cache maximum capacity. */
+  public static final String CONN_CACHE_MAX_CAPACITY_KEY =
+      CONN_CACHE_KEY_BASE + ".maxcapacity";
+  public static final String DEFAULT_CONN_CACHE_MAX_CAPACITY = "1000";
+  /** JDBC connection property for setting connection cache expiration duration. */
+  public static final String CONN_CACHE_EXPIRY_DURATION_KEY =
+      CONN_CACHE_KEY_BASE + ".expirydiration";
+  public static final String DEFAULT_CONN_CACHE_EXPIRY_DURATION = "10";
+  /** JDBC connection property for setting connection cache expiration unit. */
+  public static final String CONN_CACHE_EXPIRY_UNIT_KEY = CONN_CACHE_KEY_BASE + ".expiryunit";
+  public static final String DEFAULT_CONN_CACHE_EXPIRY_UNIT = TimeUnit.MINUTES.name();
+
+  //
+  // Constants for statement cache settings.
+  //
+
+  private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache";
+  /** JDBC connection property for setting connection cache concurrency level. */
+  public static final String STMT_CACHE_CONCURRENCY_KEY =
+      STMT_CACHE_KEY_BASE + ".concurrency";
+  public static final String DEFAULT_STMT_CACHE_CONCURRENCY_LEVEL = "100";
+  /** JDBC connection property for setting connection cache initial capacity. */
+  public static final String STMT_CACHE_INITIAL_CAPACITY_KEY =
+      STMT_CACHE_KEY_BASE + ".initialcapacity";
+  public static final String DEFAULT_STMT_CACHE_INITIAL_CAPACITY = "1000";
+  /** JDBC connection property for setting connection cache maximum capacity. */
+  public static final String STMT_CACHE_MAX_CAPACITY_KEY =
+      STMT_CACHE_KEY_BASE + ".maxcapacity";
+  public static final String DEFAULT_STMT_CACHE_MAX_CAPACITY = "10000";
+  /** JDBC connection property for setting connection cache expiration duration. */
+  public static final String STMT_CACHE_EXPIRY_DURATION_KEY =
+      STMT_CACHE_KEY_BASE + ".expirydiration";
+  public static final String DEFAULT_STMT_CACHE_EXPIRY_DURATION = "5";
+  /** JDBC connection property for setting connection cache expiration unit. */
+  public static final String STMT_CACHE_EXPIRY_UNIT_KEY = STMT_CACHE_KEY_BASE + ".expiryunit";
+  public static final String DEFAULT_STMT_CACHE_EXPIRY_UNIT = TimeUnit.MINUTES.name();
+
   private static final String DEFAULT_CONN_ID =
       UUID.fromString("00000000-0000-0000-0000-000000000000").toString();
 
   private final String url;
   private final Properties info;
   private final Connection connection; // TODO: remove default connection
-  private final Map<String, Connection> connectionMap = new HashMap<>();
-  private final Map<Integer, StatementInfo> statementMap = new HashMap<>();
+  private final Cache<String, Connection> connectionCache;
+  private final Cache<Integer, StatementInfo> statementCache;
 
   /**
    * Convert from JDBC metadata to Avatica columns.
@@ -148,6 +204,58 @@ public class JdbcMeta implements Meta {
     return signature(metaData, null, null);
   }
 
+  /** Callback for {@link #connectionCache} member expiration. */
+  private class ConnectionExpiryHandler
+      implements RemovalListener<String, Connection> {
+
+    public void onRemoval(RemovalNotification<String, Connection> notification) {
+      String connectionId = notification.getKey();
+      Connection doomed = notification.getValue();
+      // is String.equals() more efficient?
+      if (notification.getValue() == connection) {
+        return;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Expiring connection " + connectionId + " because "
+            + notification.getCause());
+      }
+      try {
+        if (doomed != null) {
+          doomed.close();
+        }
+      } catch (Throwable t) {
+        LOG.info("Exception thrown while expiring connection " + connectionId, t);
+      }
+    }
+  }
+
+  /** Callback for {@link #statementCache} member expiration. */
+  private class StatementExpiryHandler
+      implements RemovalListener<Integer, StatementInfo> {
+    public void onRemoval(RemovalNotification<Integer, StatementInfo> notification) {
+      Integer stmtId = notification.getKey();
+      StatementInfo doomed = notification.getValue();
+      if (doomed == null) {
+        // log/throw?
+        return;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Expiring statement " + stmtId + " because "
+            + notification.getCause());
+      }
+      try {
+        if (doomed.resultSet != null) {
+          doomed.resultSet.close();
+        }
+        if (doomed.statement != null) {
+          doomed.statement.close();
+        }
+      } catch (Throwable t) {
+        LOG.info("Exception thrown while expiring statement " + stmtId);
+      }
+    }
+  }
+
   /**
    * @param url a database url of the form
    *  <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
@@ -184,7 +292,48 @@ public class JdbcMeta implements Meta {
     this.url = url;
     this.info = info;
     this.connection = DriverManager.getConnection(url, info);
-    this.connectionMap.put(DEFAULT_CONN_ID, connection);
+
+    int concurrencyLevel = Integer.parseInt(
+        info.getProperty(CONN_CACHE_CONCURRENCY_KEY, DEFAULT_CONN_CACHE_CONCURRENCY_LEVEL));
+    int initialCapacity = Integer.parseInt(
+        info.getProperty(CONN_CACHE_INITIAL_CAPACITY_KEY, DEFAULT_CONN_CACHE_INITIAL_CAPACITY));
+    long maxCapacity = Long.parseLong(
+        info.getProperty(CONN_CACHE_MAX_CAPACITY_KEY, DEFAULT_CONN_CACHE_MAX_CAPACITY));
+    long connectionExpiryDuration = Long.parseLong(
+        info.getProperty(CONN_CACHE_EXPIRY_DURATION_KEY, DEFAULT_CONN_CACHE_EXPIRY_DURATION));
+    TimeUnit connectionExpiryUnit = TimeUnit.valueOf(
+        info.getProperty(CONN_CACHE_EXPIRY_UNIT_KEY, DEFAULT_CONN_CACHE_EXPIRY_UNIT));
+    this.connectionCache = CacheBuilder.newBuilder()
+        .concurrencyLevel(concurrencyLevel)
+        .initialCapacity(initialCapacity)
+        .maximumSize(maxCapacity)
+        .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit)
+        .removalListener(new ConnectionExpiryHandler())
+        .build();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("instantiated connection cache: " + connectionCache.stats());
+    }
+
+    concurrencyLevel = Integer.parseInt(
+        info.getProperty(STMT_CACHE_CONCURRENCY_KEY, DEFAULT_STMT_CACHE_CONCURRENCY_LEVEL));
+    initialCapacity = Integer.parseInt(
+        info.getProperty(STMT_CACHE_INITIAL_CAPACITY_KEY, DEFAULT_STMT_CACHE_INITIAL_CAPACITY));
+    maxCapacity = Long.parseLong(
+        info.getProperty(STMT_CACHE_MAX_CAPACITY_KEY, DEFAULT_STMT_CACHE_MAX_CAPACITY));
+    connectionExpiryDuration = Long.parseLong(
+        info.getProperty(STMT_CACHE_EXPIRY_DURATION_KEY, DEFAULT_STMT_CACHE_EXPIRY_DURATION));
+    connectionExpiryUnit = TimeUnit.valueOf(
+        info.getProperty(STMT_CACHE_EXPIRY_UNIT_KEY, DEFAULT_STMT_CACHE_EXPIRY_UNIT));
+    this.statementCache = CacheBuilder.newBuilder()
+        .concurrencyLevel(concurrencyLevel)
+        .initialCapacity(initialCapacity)
+        .maximumSize(maxCapacity)
+        .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit)
+        .removalListener(new StatementExpiryHandler())
+        .build();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("instantiated statement cache: " + statementCache.stats());
+    }
   }
 
   public String getSqlKeywords() {
@@ -411,10 +560,12 @@ public class JdbcMeta implements Meta {
   }
 
   protected Connection getConnection(String id) throws SQLException {
-    if (connectionMap.get(id) == null) {
-      connectionMap.put(id, DriverManager.getConnection(url, info));
+    Connection conn = connectionCache.getIfPresent(id);
+    if (conn == null) {
+      conn = DriverManager.getConnection(url, info);
+      connectionCache.put(id, conn);
     }
-    return connectionMap.get(id);
+    return conn;
   }
 
   public StatementHandle createStatement(ConnectionHandle ch) {
@@ -422,7 +573,7 @@ public class JdbcMeta implements Meta {
       final Connection conn = getConnection(ch.id);
       final Statement statement = conn.createStatement();
       final int id = System.identityHashCode(statement);
-      statementMap.put(id, new StatementInfo(statement));
+      statementCache.put(id, new StatementInfo(statement));
       StatementHandle h = new StatementHandle(ch.id, id, null);
       if (LOG.isTraceEnabled()) {
         LOG.trace("created statement " + h);
@@ -434,8 +585,8 @@ public class JdbcMeta implements Meta {
   }
 
   @Override public void closeStatement(StatementHandle h) {
-    Statement stmt = statementMap.get(h.id).statement;
-    if (stmt == null) {
+    StatementInfo info = statementCache.getIfPresent(h.id);
+    if (info == null || info.statement == null) {
       LOG.debug("client requested close unknown statement " + h);
       return;
     }
@@ -443,19 +594,19 @@ public class JdbcMeta implements Meta {
       LOG.trace("closing statement " + h);
     }
     try {
-      boolean isOwned =
-          stmt.getConnection() == connectionMap.get(h.connectionId);
-      stmt.close();
-      assert isOwned : "no connection found while closing " + h;
+      if (info.resultSet != null) {
+        info.resultSet.close();
+      }
+      info.statement.close();
     } catch (SQLException e) {
       throw propagate(e);
     } finally {
-      statementMap.remove(h.id);
+      statementCache.invalidate(h.id);
     }
   }
 
   @Override public void closeConnection(ConnectionHandle ch) {
-    Connection conn = connectionMap.get(ch.id);
+    Connection conn = connectionCache.getIfPresent(ch.id);
     if (conn == null) {
       LOG.debug("client requested close unknown connection " + ch);
       return;
@@ -468,7 +619,7 @@ public class JdbcMeta implements Meta {
     } catch (SQLException e) {
       throw propagate(e);
     } finally {
-      connectionMap.remove(ch.id);
+      connectionCache.invalidate(ch.id);
     }
   }
 
@@ -488,9 +639,10 @@ public class JdbcMeta implements Meta {
       final Connection conn = getConnection(ch.id);
       final PreparedStatement statement = conn.prepareStatement(sql);
       final int id = System.identityHashCode(statement);
-      statementMap.put(id, new StatementInfo(statement));
-      StatementHandle h = new StatementHandle(ch.id, id, signature(
-          statement.getMetaData(), statement.getParameterMetaData(), sql));
+      statementCache.put(id, new StatementInfo(statement));
+      StatementHandle h = new StatementHandle(ch.id, id,
+          signature(statement.getMetaData(), statement.getParameterMetaData(),
+              sql));
       if (LOG.isTraceEnabled()) {
         LOG.trace("prepared statement " + h);
       }
@@ -507,7 +659,7 @@ public class JdbcMeta implements Meta {
       final PreparedStatement statement = connection.prepareStatement(sql);
       final int id = System.identityHashCode(statement);
       final StatementInfo info = new StatementInfo(statement);
-      statementMap.put(id, info);
+      statementCache.put(id, info);
       info.resultSet = statement.executeQuery();
       MetaResultSet mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
       if (LOG.isTraceEnabled()) {
@@ -525,10 +677,10 @@ public class JdbcMeta implements Meta {
     if (LOG.isTraceEnabled()) {
       LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
     }
-    final StatementInfo statementInfo = statementMap.get(h.id);
     try {
-      assert statementInfo.statement.getConnection()
-          == connectionMap.get(h.connectionId);
+      final StatementInfo statementInfo = Objects.requireNonNull(
+          statementCache.getIfPresent(h.id),
+          "Statement not found, potentially expired. " + h);
       if (statementInfo.resultSet == null || parameterValues != null) {
         if (statementInfo.resultSet != null) {
           statementInfo.resultSet.close();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index dc4268b..b8d640d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -130,7 +130,6 @@ public abstract class JsonService implements Service {
     }
   }
 
-  @Override
   public CloseStatementResponse apply(CloseStatementRequest request) {
     try {
       return decode(apply(encode(request)), CloseStatementResponse.class);
@@ -139,7 +138,6 @@ public abstract class JsonService implements Service {
     }
   }
 
-  @Override
   public CloseConnectionResponse apply(CloseConnectionRequest request) {
     try {
       return decode(apply(encode(request)), CloseConnectionResponse.class);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 719ef1d..842ebd6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -190,14 +190,13 @@ public class LocalService implements Service {
     return new CreateStatementResponse(h.connectionId, h.id);
   }
 
-  @Override
   public CloseStatementResponse apply(CloseStatementRequest request) {
-    meta.closeStatement(new Meta.StatementHandle(
-        request.connectionId, request.statementId, null));
+    meta.closeStatement(
+        new Meta.StatementHandle(request.connectionId, request.statementId,
+            null));
     return new CloseStatementResponse();
   }
 
-  @Override
   public CloseConnectionResponse apply(CloseConnectionRequest request) {
     meta.closeConnection(new Meta.ConnectionHandle(request.connectionId));
     return new CloseConnectionResponse();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e72ad8b..772321c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -118,8 +118,8 @@ class RemoteMeta extends MetaImpl {
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
-        response = service.apply(new Service.PrepareAndExecuteRequest(
-            ch.id, sql, maxRowCount));
+        response = service.apply(
+            new Service.PrepareAndExecuteRequest(ch.id, sql, maxRowCount));
         callback.assign(response.signature, response.firstFrame);
       }
       callback.execute();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index db4b42a..0f6ab8d 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -26,6 +26,8 @@ import org.apache.calcite.avatica.remote.LocalService;
 import org.apache.calcite.avatica.remote.MockJsonService;
 import org.apache.calcite.avatica.remote.Service;
 
+import com.google.common.cache.Cache;
+
 import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
 
 import org.junit.Before;
@@ -200,7 +202,7 @@ public class RemoteDriverTest {
   @Test public void testStatementLifecycle() throws Exception {
     try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
       Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
-      Map<Integer, Statement> serverMap =
+      Cache<Integer, Object> serverMap =
           QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
       assertEquals(0, clientMap.size());
       assertEquals(0, serverMap.size());
@@ -217,27 +219,29 @@ public class RemoteDriverTest {
     final String sql = "select * from (values (1, 'a'))";
     Connection conn1 = ljs();
     Connection conn2 = ljs();
-    Map<String, Connection> connectionMap =
+    Cache<String, Connection> connectionMap =
         QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
             (AvaticaConnection) conn1);
-    assertEquals("should contain at least the default connection",
-        1, connectionMap.size());
+    assertEquals("connection cache should start empty",
+        0, connectionMap.size());
     PreparedStatement conn1stmt1 = conn1.prepareStatement(sql);
     assertEquals(
         "statement creation implicitly creates a connection server-side",
-        2, connectionMap.size());
+        1, connectionMap.size());
     PreparedStatement conn2stmt1 = conn2.prepareStatement(sql);
     assertEquals(
         "statement creation implicitly creates a connection server-side",
-        3, connectionMap.size());
+        2, connectionMap.size());
     AvaticaPreparedStatement s1 = (AvaticaPreparedStatement) conn1stmt1;
     AvaticaPreparedStatement s2 = (AvaticaPreparedStatement) conn2stmt1;
     assertFalse("connection id's should be unique",
         s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
     conn2.close();
-    conn1.close();
     assertEquals("closing a connection closes the server-side connection",
         1, connectionMap.size());
+    conn1.close();
+    assertEquals("closing a connection closes the server-side connection",
+        0, connectionMap.size());
   }
 
   private void checkStatementExecuteQuery(Connection connection)
@@ -318,8 +322,9 @@ public class RemoteDriverTest {
   public static class LocalJdbcServiceFactory implements Service.Factory {
     @Override public Service create(AvaticaConnection connection) {
       try {
-        return new LocalService(new JdbcMeta(CONNECTION_SPEC.url,
-            CONNECTION_SPEC.username, CONNECTION_SPEC.password));
+        return new LocalService(
+            new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
+                CONNECTION_SPEC.password));
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
@@ -355,7 +360,7 @@ public class RemoteDriverTest {
      * statement map from the other side.
      * TODO: refactor tests to replace reflection with package-local access
      */
-    static Map<Integer, Statement>
+    static Cache<Integer, Object>
     getRemoteStatementMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
@@ -371,9 +376,10 @@ public class RemoteDriverTest {
           remoteMetaServiceService.getClass().getDeclaredField("meta");
       remoteMetaServiceServiceMetaF.setAccessible(true);
       JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
-      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementMap");
+      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementCache");
       jdbcMetaStatementMapF.setAccessible(true);
-      return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
+      //noinspection unchecked
+      return (Cache<Integer, Object>) jdbcMetaStatementMapF.get(serverMeta);
     }
 
     /**
@@ -381,7 +387,7 @@ public class RemoteDriverTest {
      * connection map from the other side.
      * TODO: refactor tests to replace reflection with package-local access
      */
-    static Map<String, Connection>
+    static Cache<String, Connection>
     getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
@@ -397,9 +403,10 @@ public class RemoteDriverTest {
           remoteMetaServiceService.getClass().getDeclaredField("meta");
       remoteMetaServiceServiceMetaF.setAccessible(true);
       JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
-      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("connectionMap");
-      jdbcMetaStatementMapF.setAccessible(true);
-      return (Map<String, Connection>) jdbcMetaStatementMapF.get(serverMeta);
+      Field jdbcMetaConnectionCacheF = JdbcMeta.class.getDeclaredField("connectionCache");
+      jdbcMetaConnectionCacheF.setAccessible(true);
+      //noinspection unchecked
+      return (Cache<String, Connection>) jdbcMetaConnectionCacheF.get(serverMeta);
     }
   }
 


[06/10] incubator-calcite git commit: [CALCITE-639] Open up permissions on avatica server components (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-639] Open up permissions on avatica server components (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/c446e02a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/c446e02a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/c446e02a

Branch: refs/heads/master
Commit: c446e02a4cc4f0e174c1a48cdc68773ab0ad85dd
Parents: 18fea1f
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Thu Mar 26 11:50:56 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/calcite/avatica/server/AvaticaHandler.java | 2 +-
 .../java/org/apache/calcite/avatica/server/HttpServer.java     | 4 ++--
 .../src/main/java/org/apache/calcite/avatica/server/Main.java  | 6 ++++++
 3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
index 0a5258b..bb4605f 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
@@ -30,7 +30,7 @@ import javax.servlet.http.HttpServletResponse;
 /**
  * Jetty handler that executes Avatica JSON request-responses.
  */
-class AvaticaHandler extends AbstractHandler {
+public class AvaticaHandler extends AbstractHandler {
   final JsonHandler jsonHandler;
 
   public AvaticaHandler(Service service) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
index f1505f2..4aa0f24 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
@@ -32,12 +32,12 @@ public class HttpServer {
   private int port = -1;
   private final Handler handler;
 
-  HttpServer(int port, Handler handler) {
+  public HttpServer(int port, Handler handler) {
     this.port = port;
     this.handler = handler;
   }
 
-  void start() {
+  public void start() {
     if (server != null) {
       throw new RuntimeException("Server is already started");
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
index dfb59cd..d3a24f0 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -35,6 +35,12 @@ public class Main {
     server.join();
   }
 
+  /**
+   * Create and start an {@link HttpServer}.
+   * @param args <br />&nbsp;&nbsp;args[0]: the {@link Meta.Factory} class name.<br />
+   *             &nbsp;&nbsp;args[1:]: arguments passed along to
+   *             {@link Meta.Factory#create(java.util.List)}
+   */
   public static HttpServer start(String[] args)
       throws ClassNotFoundException, InstantiationException,
       IllegalAccessException {


[04/10] incubator-calcite git commit: [CALCITE-644] Increase check style line limit to 100 chars (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-644] Increase check style line limit to 100 chars (Nick Dimiduk)

Close apache/incubator-calcite#64


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/aa7cb035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/aa7cb035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/aa7cb035

Branch: refs/heads/master
Commit: aa7cb0355159a78a98573293fd67f6dc692975ae
Parents: 321dc43
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 11:00:35 2015 -0700
Committer: julianhyde <jh...@apache.org>
Committed: Fri Mar 27 13:19:05 2015 -0700

----------------------------------------------------------------------
 src/main/config/checkstyle/checker.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/aa7cb035/src/main/config/checkstyle/checker.xml
----------------------------------------------------------------------
diff --git a/src/main/config/checkstyle/checker.xml b/src/main/config/checkstyle/checker.xml
index 0ff2529..5b2fe9f 100644
--- a/src/main/config/checkstyle/checker.xml
+++ b/src/main/config/checkstyle/checker.xml
@@ -226,7 +226,7 @@ limitations under the License.
     <!-- Lines cannot exceed 80 chars, except if they are hyperlinks
          or strings (possibly preceded by '+' and followed by say '),'. -->
     <module name="LineLength">
-      <property name="max" value="80"/>
+      <property name="max" value="100"/>
       <property name="ignorePattern" value="^import|@see|@link|@BaseMessage|href|^[ +]*&quot;.*&quot;[);,]*$"/>
     </module>
       <!-- Over time, we will revise this down -->


[02/10] incubator-calcite git commit: [CALCITE-649] Extend splitCondition method in RelOptUtil to handle multiple joins on the same key (Jesus Camacho Rodriguez)

Posted by jh...@apache.org.
[CALCITE-649] Extend splitCondition method in RelOptUtil to handle multiple joins on the same key (Jesus Camacho Rodriguez)

Also some code cleanup

Close apache/incubator-calcite#66


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/61eea9ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/61eea9ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/61eea9ce

Branch: refs/heads/master
Commit: 61eea9ce4f0b02cb84557477bb58fd38d03e9c5c
Parents: a24b3c1
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Fri Mar 27 11:35:28 2015 +0000
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 10:04:50 2015 -0700

----------------------------------------------------------------------
 .../org/apache/calcite/plan/RelOptUtil.java     | 277 +++++++++++--------
 1 file changed, 160 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/61eea9ce/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index d09bf1f..cf68db9 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -174,7 +174,7 @@ public abstract class RelOptUtil {
     if (used.size() == 0) {
       return ImmutableList.of();
     }
-    List<String> result = new ArrayList<String>();
+    final List<String> result = new ArrayList<>();
     for (String s : set) {
       if (used.contains(s) && !result.contains(s)) {
         result.add(s);
@@ -439,7 +439,7 @@ public abstract class RelOptUtil {
       }
 
       // for IN/NOT IN, it needs to output the fields
-      final List<RexNode> exprs = new ArrayList<RexNode>();
+      final List<RexNode> exprs = new ArrayList<>();
       if (subqueryType == SubqueryType.IN) {
         for (int i = 0; i < keyCount; i++) {
           exprs.add(rexBuilder.makeInputRef(ret, i));
@@ -502,17 +502,16 @@ public abstract class RelOptUtil {
         : "rename: field count mismatch: in=" + inputType
         + ", out" + outputType;
 
-    List<Pair<RexNode, String>> renames =
-        new ArrayList<Pair<RexNode, String>>();
+    final List<Pair<RexNode, String>> renames = new ArrayList<>();
     for (Pair<RelDataTypeField, RelDataTypeField> pair
         : Pair.zip(inputFields, outputFields)) {
       final RelDataTypeField inputField = pair.left;
       final RelDataTypeField outputField = pair.right;
       assert inputField.getType().equals(outputField.getType());
+      final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
       renames.add(
-          Pair.of(
-              (RexNode) rel.getCluster().getRexBuilder().makeInputRef(
-                  inputField.getType(),
+          Pair.<RexNode, String>of(
+              rexBuilder.makeInputRef(inputField.getType(),
                   inputField.getIndex()),
               outputField.getName()));
     }
@@ -582,7 +581,7 @@ public abstract class RelOptUtil {
       RelNode rel,
       Integer[] fieldOrdinals) {
     RexNode condition = null;
-    RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
+    final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
     RelDataType rowType = rel.getRowType();
     int n;
     if (fieldOrdinals != null) {
@@ -662,9 +661,9 @@ public abstract class RelOptUtil {
       // nothing to do
       return rel;
     }
-    List<RexNode> castExps =
-        RexUtil.generateCastExpressions(
-            rel.getCluster().getRexBuilder(), castRowType, rowType);
+    final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
+    final List<RexNode> castExps =
+        RexUtil.generateCastExpressions(rexBuilder, castRowType, rowType);
     if (rename) {
       // Use names and types from castRowType.
       return projectFactory.createProject(
@@ -691,8 +690,8 @@ public abstract class RelOptUtil {
       RelOptCluster cluster,
       RelNode rel) {
     // assert (rel.getRowType().getFieldCount() == 1);
-    int aggCallCnt = rel.getRowType().getFieldCount();
-    List<AggregateCall> aggCalls = new ArrayList<AggregateCall>();
+    final int aggCallCnt = rel.getRowType().getFieldCount();
+    final List<AggregateCall> aggCalls = new ArrayList<>();
 
     for (int i = 0; i < aggCallCnt; i++) {
       RelDataType returnType =
@@ -794,7 +793,7 @@ public abstract class RelOptUtil {
       RexNode condition,
       List<Integer> leftKeys,
       List<Integer> rightKeys) {
-    List<RexNode> nonEquiList = new ArrayList<RexNode>();
+    final List<RexNode> nonEquiList = new ArrayList<>();
 
     splitJoinCondition(
         left.getRowType().getFieldCount(),
@@ -819,9 +818,9 @@ public abstract class RelOptUtil {
       RelNode left,
       RelNode right,
       RexNode condition) {
-    final List<Integer> leftKeys = new ArrayList<Integer>();
-    final List<Integer> rightKeys = new ArrayList<Integer>();
-    final List<RexNode> nonEquiList = new ArrayList<RexNode>();
+    final List<Integer> leftKeys = new ArrayList<>();
+    final List<Integer> rightKeys = new ArrayList<>();
+    final List<RexNode> nonEquiList = new ArrayList<>();
     splitJoinCondition(
         left.getRowType().getFieldCount(),
         condition,
@@ -864,29 +863,64 @@ public abstract class RelOptUtil {
       List<RexNode> rightJoinKeys,
       List<Integer> filterNulls,
       List<SqlOperator> rangeOp) {
-    List<RexNode> nonEquiList = new ArrayList<RexNode>();
+    return splitJoinCondition(
+        sysFieldList,
+        ImmutableList.of(leftRel, rightRel),
+        condition,
+        ImmutableList.of(leftJoinKeys, rightJoinKeys),
+        filterNulls,
+        rangeOp);
+  }
+
+  /**
+   * Splits out the equi-join (and optionally, a single non-equi) components
+   * of a join condition, and returns what's left. Projection might be
+   * required by the caller to provide join keys that are not direct field
+   * references.
+   *
+   * @param sysFieldList  list of system fields
+   * @param inputs        join inputs
+   * @param condition     join condition
+   * @param joinKeys      The join keys from the inputs which are equi-join
+   *                      keys
+   * @param filterNulls   The join key positions for which null values will not
+   *                      match. null values only match for the "is not distinct
+   *                      from" condition.
+   * @param rangeOp       if null, only locate equi-joins; otherwise, locate a
+   *                      single non-equi join predicate and return its operator
+   *                      in this list; join keys associated with the non-equi
+   *                      join predicate are at the end of the key lists
+   *                      returned
+   * @return What's left, never null
+   */
+  public static RexNode splitJoinCondition(
+      List<RelDataTypeField> sysFieldList,
+      List<RelNode> inputs,
+      RexNode condition,
+      List<List<RexNode>> joinKeys,
+      List<Integer> filterNulls,
+      List<SqlOperator> rangeOp) {
+    final List<RexNode> nonEquiList = new ArrayList<>();
 
     splitJoinCondition(
         sysFieldList,
-        leftRel,
-        rightRel,
+        inputs,
         condition,
-        leftJoinKeys,
-        rightJoinKeys,
+        joinKeys,
         filterNulls,
         rangeOp,
         nonEquiList);
 
     // Convert the remainders into a list that are AND'ed together.
     return RexUtil.composeConjunction(
-        leftRel.getCluster().getRexBuilder(), nonEquiList, false);
+        inputs.get(0).getCluster().getRexBuilder(), nonEquiList, false);
   }
 
   public static RexNode splitCorrelatedFilterCondition(
       LogicalFilter filter,
       List<RexInputRef> joinKeys,
       List<RexNode> correlatedJoinKeys) {
-    List<RexNode> nonEquiList = new ArrayList<RexNode>();
+    final List<RexNode> nonEquiList = new ArrayList<>();
 
     splitCorrelatedFilterCondition(
         filter,
@@ -905,7 +939,7 @@ public abstract class RelOptUtil {
       List<RexNode> joinKeys,
       List<RexNode> correlatedJoinKeys,
       boolean extractCorrelatedFieldAccess) {
-    List<RexNode> nonEquiList = new ArrayList<RexNode>();
+    final List<RexNode> nonEquiList = new ArrayList<>();
 
     splitCorrelatedFilterCondition(
         filter,
@@ -922,36 +956,33 @@ public abstract class RelOptUtil {
 
   private static void splitJoinCondition(
       List<RelDataTypeField> sysFieldList,
-      RelNode leftRel,
-      RelNode rightRel,
+      List<RelNode> inputs,
       RexNode condition,
-      List<RexNode> leftJoinKeys,
-      List<RexNode> rightJoinKeys,
+      List<List<RexNode>> joinKeys,
       List<Integer> filterNulls,
       List<SqlOperator> rangeOp,
       List<RexNode> nonEquiList) {
     final int sysFieldCount = sysFieldList.size();
-    final int leftFieldCount = leftRel.getRowType().getFieldCount();
-    final int rightFieldCount = rightRel.getRowType().getFieldCount();
-    final int firstLeftField = sysFieldCount;
-    final int firstRightField = sysFieldCount + leftFieldCount;
-    final int totalFieldCount = firstRightField + rightFieldCount;
+    final RelOptCluster cluster = inputs.get(0).getCluster();
+    final RexBuilder rexBuilder = cluster.getRexBuilder();
+    final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
 
-    final List<RelDataTypeField> leftFields =
-        leftRel.getRowType().getFieldList();
-    final List<RelDataTypeField> rightFields =
-        rightRel.getRowType().getFieldList();
-
-    RexBuilder rexBuilder = leftRel.getCluster().getRexBuilder();
-    RelDataTypeFactory typeFactory = leftRel.getCluster().getTypeFactory();
+    int[] firstFieldInputs = new int[inputs.size()];
+    int totalFieldCount = 0;
+    for (int i = 0; i < inputs.size(); i++) {
+      firstFieldInputs[i] = totalFieldCount + sysFieldCount;
+      totalFieldCount += sysFieldCount
+              + inputs.get(i).getRowType().getFieldCount();
+    }
 
     // adjustment array
     int[] adjustments = new int[totalFieldCount];
-    for (int i = firstLeftField; i < firstRightField; i++) {
-      adjustments[i] = -firstLeftField;
-    }
-    for (int i = firstRightField; i < totalFieldCount; i++) {
-      adjustments[i] = -firstRightField;
+    for (int i = 0; i < inputs.size(); i++) {
+      int limit = i == inputs.size() - 1
+              ? totalFieldCount : firstFieldInputs[i + 1];
+      for (int j = firstFieldInputs[i]; j < limit; j++) {
+        adjustments[j] = -firstFieldInputs[i];
+      }
     }
 
     if (condition instanceof RexCall) {
@@ -960,11 +991,9 @@ public abstract class RelOptUtil {
         for (RexNode operand : call.getOperands()) {
           splitJoinCondition(
               sysFieldList,
-              leftRel,
-              rightRel,
+              inputs,
               operand,
-              leftJoinKeys,
-              rightJoinKeys,
+              joinKeys,
               filterNulls,
               rangeOp,
               nonEquiList);
@@ -974,6 +1003,10 @@ public abstract class RelOptUtil {
 
       RexNode leftKey = null;
       RexNode rightKey = null;
+      int leftInput = 0;
+      int rightInput = 0;
+      List<RelDataTypeField> leftFields = null;
+      List<RelDataTypeField> rightFields = null;
       boolean reverse = false;
 
       SqlKind kind = call.getKind();
@@ -995,18 +1028,37 @@ public abstract class RelOptUtil {
         final ImmutableBitSet projRefs0 = InputFinder.bits(op0);
         final ImmutableBitSet projRefs1 = InputFinder.bits(op1);
 
-        if ((projRefs0.nextSetBit(firstRightField) < 0)
-            && (projRefs1.nextSetBit(firstLeftField)
-            >= firstRightField)) {
-          leftKey = op0;
-          rightKey = op1;
-        } else if (
-            (projRefs1.nextSetBit(firstRightField) < 0)
-                && (projRefs0.nextSetBit(firstLeftField)
-                >= firstRightField)) {
-          leftKey = op1;
-          rightKey = op0;
-          reverse = true;
+        boolean foundBothInputs = false;
+        for (int i = 0; i < inputs.size() && !foundBothInputs; i++) {
+          final int lowerLimit = firstFieldInputs[i];
+          final int upperLimit = i == inputs.size() - 1
+                  ? totalFieldCount : firstFieldInputs[i + 1];
+          if (projRefs0.nextSetBit(lowerLimit) < upperLimit
+                  && projRefs0.nextSetBit(lowerLimit) != -1) {
+            if (leftKey == null) {
+              leftKey = op0;
+              leftInput = i;
+              leftFields = inputs.get(leftInput).getRowType().getFieldList();
+            } else {
+              rightKey = op0;
+              rightInput = i;
+              rightFields = inputs.get(rightInput).getRowType().getFieldList();
+              reverse = true;
+              foundBothInputs = true;
+            }
+          } else if (projRefs1.nextSetBit(lowerLimit) < upperLimit
+                  && projRefs1.nextSetBit(lowerLimit) != -1) {
+            if (leftKey == null) {
+              leftKey = op1;
+              leftInput = i;
+              leftFields = inputs.get(leftInput).getRowType().getFieldList();
+            } else {
+              rightKey = op1;
+              rightInput = i;
+              rightFields = inputs.get(rightInput).getRowType().getFieldList();
+              foundBothInputs = true;
+            }
+          }
         }
 
         if ((leftKey != null) && (rightKey != null)) {
@@ -1070,33 +1122,29 @@ public abstract class RelOptUtil {
         leftKey = null;
         rightKey = null;
 
-        if (projRefs.nextSetBit(firstRightField) < 0) {
-          leftKey = condition.accept(
-              new RelOptUtil.RexInputConverter(
-                  rexBuilder,
-                  leftFields,
-                  leftFields,
-                  adjustments));
+        boolean foundInput = false;
+        for (int i = 0; i < inputs.size() && !foundInput; i++) {
+          final int lowerLimit = firstFieldInputs[i];
+          final int upperLimit = i == inputs.size() - 1
+                  ? totalFieldCount : firstFieldInputs[i + 1];
+          if (projRefs.nextSetBit(lowerLimit) < upperLimit) {
+            leftInput = i;
+            leftFields = inputs.get(leftInput).getRowType().getFieldList();
 
-          rightKey = rexBuilder.makeLiteral(true);
+            leftKey = condition.accept(
+                new RelOptUtil.RexInputConverter(
+                    rexBuilder,
+                    leftFields,
+                    leftFields,
+                    adjustments));
 
-          // effectively performing an equality comparison
-          kind = SqlKind.EQUALS;
-        } else if (projRefs.nextSetBit(firstLeftField)
-            >= firstRightField) {
-          leftKey = rexBuilder.makeLiteral(true);
+            rightKey = rexBuilder.makeLiteral(true);
 
-          // replace right Key input ref
-          rightKey =
-              condition.accept(
-                  new RelOptUtil.RexInputConverter(
-                      rexBuilder,
-                      rightFields,
-                      rightFields,
-                      adjustments));
+            // effectively performing an equality comparison
+            kind = SqlKind.EQUALS;
 
-          // effectively performing an equality comparison
-          kind = SqlKind.EQUALS;
+            foundInput = true;
+          }
         }
       }
 
@@ -1106,18 +1154,18 @@ public abstract class RelOptUtil {
         // non-equi join predicate, it appears at the end of the
         // key list; also mark the null filtering property
         addJoinKey(
-            leftJoinKeys,
+            joinKeys.get(leftInput),
             leftKey,
             (rangeOp != null) && !rangeOp.isEmpty());
         addJoinKey(
-            rightJoinKeys,
+            joinKeys.get(rightInput),
             rightKey,
             (rangeOp != null) && !rangeOp.isEmpty());
         if (filterNulls != null
             && kind == SqlKind.EQUALS) {
           // nulls are considered not matching for equality comparison
           // add the position of the most recently inserted key
-          filterNulls.add(leftJoinKeys.size() - 1);
+          filterNulls.add(joinKeys.get(leftInput).size() - 1);
         }
         if (rangeOp != null
             && kind != SqlKind.EQUALS
@@ -1420,11 +1468,11 @@ public abstract class RelOptUtil {
     int origLeftInputSize = leftRel.getRowType().getFieldCount();
     int origRightInputSize = rightRel.getRowType().getFieldCount();
 
-    List<RexNode> newLeftFields = new ArrayList<RexNode>();
-    List<String> newLeftFieldNames = new ArrayList<String>();
+    final List<RexNode> newLeftFields = new ArrayList<>();
+    final List<String> newLeftFieldNames = new ArrayList<>();
 
-    List<RexNode> newRightFields = new ArrayList<RexNode>();
-    List<String> newRightFieldNames = new ArrayList<String>();
+    final List<RexNode> newRightFields = new ArrayList<>();
+    final List<String> newRightFieldNames = new ArrayList<>();
     int leftKeyCount = leftJoinKeys.size();
     int rightKeyCount = rightJoinKeys.size();
     int i;
@@ -1519,15 +1567,13 @@ public abstract class RelOptUtil {
     // join, then no need to create a projection
     if ((newProjectOutputSize > 0)
         && (newProjectOutputSize < joinOutputFields.size())) {
-      List<Pair<RexNode, String>> newProjects =
-          new ArrayList<Pair<RexNode, String>>();
+      final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
       RexBuilder rexBuilder = joinRel.getCluster().getRexBuilder();
       for (int fieldIndex : outputProj) {
         final RelDataTypeField field = joinOutputFields.get(fieldIndex);
         newProjects.add(
-            Pair.of(
-                (RexNode) rexBuilder.makeInputRef(
-                    field.getType(), fieldIndex),
+            Pair.<RexNode, String>of(
+                rexBuilder.makeInputRef(field.getType(), fieldIndex),
                 field.getName()));
       }
 
@@ -1945,7 +1991,7 @@ public abstract class RelOptUtil {
    * {@code conjunctions(FALSE)} returns list {@code {FALSE}}.</p>
    */
   public static List<RexNode> conjunctions(RexNode rexPredicate) {
-    final List<RexNode> list = new ArrayList<RexNode>();
+    final List<RexNode> list = new ArrayList<>();
     decomposeConjunction(rexPredicate, list);
     return list;
   }
@@ -1956,7 +2002,7 @@ public abstract class RelOptUtil {
    * <p>For example, {@code disjunctions(FALSE)} returns the empty list.</p>
    */
   public static List<RexNode> disjunctions(RexNode rexPredicate) {
-    final List<RexNode> list = new ArrayList<RexNode>();
+    final List<RexNode> list = new ArrayList<>();
     decomposeDisjunction(rexPredicate, list);
     return list;
   }
@@ -2007,7 +2053,7 @@ public abstract class RelOptUtil {
     if (adjustment == 0) {
       return keys;
     }
-    List<Integer> newKeys = new ArrayList<Integer>();
+    final List<Integer> newKeys = new ArrayList<>();
     for (int key : keys) {
       newKeys.add(key + adjustment);
     }
@@ -2295,7 +2341,7 @@ public abstract class RelOptUtil {
     final List<RelDataTypeField> newJoinFields =
         newJoin.getRowType().getFieldList();
     final RexBuilder rexBuilder = newJoin.getCluster().getRexBuilder();
-    final List<RexNode> exps = new ArrayList<RexNode>();
+    final List<RexNode> exps = new ArrayList<>();
     final int nFields =
         origOrder ? origJoin.getRight().getRowType().getFieldCount()
             : origJoin.getLeft().getRowType().getFieldCount();
@@ -2421,7 +2467,7 @@ public abstract class RelOptUtil {
    */
   public static RelNode replaceInput(
       RelNode parent, int ordinal, RelNode newInput) {
-    final List<RelNode> inputs = new ArrayList<RelNode>(parent.getInputs());
+    final List<RelNode> inputs = new ArrayList<>(parent.getInputs());
     if (inputs.get(ordinal) == newInput) {
       return parent;
     }
@@ -2485,7 +2531,7 @@ public abstract class RelOptUtil {
     }
     final List<RelNode> inputs = query.getInputs();
     if (!inputs.isEmpty()) {
-      final List<RelNode> newInputs = new ArrayList<RelNode>();
+      final List<RelNode> newInputs = new ArrayList<>();
       for (RelNode input : inputs) {
         newInputs.add(replaceRecurse(input, find, replace));
       }
@@ -2708,11 +2754,12 @@ public abstract class RelOptUtil {
         return permute(rel, permutation2, null);
       }
     }
-    final List<RelDataType> outputTypeList = new ArrayList<RelDataType>();
-    final List<String> outputNameList = new ArrayList<String>();
-    final List<RexNode> exprList = new ArrayList<RexNode>();
-    final List<RexLocalRef> projectRefList = new ArrayList<RexLocalRef>();
+    final List<RelDataType> outputTypeList = new ArrayList<>();
+    final List<String> outputNameList = new ArrayList<>();
+    final List<RexNode> exprList = new ArrayList<>();
+    final List<RexLocalRef> projectRefList = new ArrayList<>();
     final List<RelDataTypeField> fields = rel.getRowType().getFieldList();
+    final RelOptCluster cluster = rel.getCluster();
     for (int i = 0; i < permutation.getTargetCount(); i++) {
       int target = permutation.getTarget(i);
       final RelDataTypeField targetField = fields.get(target);
@@ -2723,24 +2770,21 @@ public abstract class RelOptUtil {
               || (fieldNames.get(i) == null)) ? targetField.getName()
               : fieldNames.get(i));
       exprList.add(
-          rel.getCluster().getRexBuilder().makeInputRef(
-              fields.get(i).getType(),
-              i));
+          cluster.getRexBuilder().makeInputRef(fields.get(i).getType(), i));
       final int source = permutation.getSource(i);
       projectRefList.add(
           new RexLocalRef(
               source,
               fields.get(source).getType()));
     }
+    final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
     final RexProgram program =
         new RexProgram(
             rel.getRowType(),
             exprList,
             projectRefList,
             null,
-            rel.getCluster().getTypeFactory().createStructType(
-                outputTypeList,
-                outputNameList));
+            typeFactory.createStructType(outputTypeList, outputNameList));
     return LogicalCalc.create(rel, program);
   }
 
@@ -2884,7 +2928,7 @@ public abstract class RelOptUtil {
 
   /** Visitor that finds all variables used but not stopped in an expression. */
   private static class VariableSetVisitor extends RelVisitor {
-    final Set<String> variables = new HashSet<String>();
+    final Set<String> variables = new HashSet<>();
 
     // implement RelVisitor
     public void visit(
@@ -2902,7 +2946,7 @@ public abstract class RelOptUtil {
 
   /** Visitor that finds all variables used in an expression. */
   public static class VariableUsedVisitor extends RexShuttle {
-    public final Set<String> variables = new LinkedHashSet<String>();
+    public final Set<String> variables = new LinkedHashSet<>();
 
     public RexNode visitCorrelVariable(RexCorrelVariable p) {
       variables.add(p.getName());
@@ -2912,8 +2956,7 @@ public abstract class RelOptUtil {
 
   /** Shuttle that finds the set of inputs that are used. */
   public static class InputReferencedVisitor extends RexShuttle {
-    public final SortedSet<Integer> inputPosReferenced =
-        new TreeSet<Integer>();
+    public final SortedSet<Integer> inputPosReferenced = new TreeSet<>();
 
     public RexNode visitInputRef(RexInputRef inputRef) {
       inputPosReferenced.add(inputRef.getIndex());


[10/10] incubator-calcite git commit: [CALCITE-650] Add metadata for average size of a tuple in SemiJoin (Jesus Camacho Rodriguez)

Posted by jh...@apache.org.
[CALCITE-650] Add metadata for average size of a tuple in SemiJoin (Jesus Camacho Rodriguez)

Close apache/incubator-calcite#67


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/e2833a29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/e2833a29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/e2833a29

Branch: refs/heads/master
Commit: e2833a297cf1308fe63795ebabb52a2d66e3b863
Parents: 0f824f8
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Fri Mar 27 19:16:13 2015 +0000
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:14:19 2015 -0700

----------------------------------------------------------------------
 .../org/apache/calcite/rel/metadata/RelMdSize.java     | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e2833a29/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
index 1a42f01..27e8a0c 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Intersect;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
@@ -172,13 +173,21 @@ public class RelMdSize {
     return list.build();
   }
 
+  public List<Double> averageColumnSizes(SemiJoin rel) {
+    return averageJoinColumnSizes(rel, true);
+  }
+
   public List<Double> averageColumnSizes(Join rel) {
+    return averageJoinColumnSizes(rel, false);
+  }
+
+  private List<Double> averageJoinColumnSizes(Join rel, boolean semijoin) {
     final RelNode left = rel.getLeft();
     final RelNode right = rel.getRight();
     final List<Double> lefts =
         RelMetadataQuery.getAverageColumnSizes(left);
-    final List<Double> rights =
-        RelMetadataQuery.getAverageColumnSizes(right);
+    final List<Double> rights = semijoin
+        ? null : RelMetadataQuery.getAverageColumnSizes(right);
     if (lefts == null && rights == null) {
       return null;
     }


[09/10] incubator-calcite git commit: [CALCITE-637] Implement Avatica CloseConnection RPC (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-637] Implement Avatica CloseConnection RPC (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/18fea1f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/18fea1f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/18fea1f0

Branch: refs/heads/master
Commit: 18fea1f0f42a3bb68bb1053e37b1c4ee08769319
Parents: 0ad6019
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 16:04:28 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 avatica/pom.xml                                 |  4 ++
 .../calcite/avatica/AvaticaConnection.java      |  1 +
 .../java/org/apache/calcite/avatica/Meta.java   |  3 ++
 .../org/apache/calcite/avatica/MetaImpl.java    | 13 +++++
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 56 ++++++++++++++++++--
 .../calcite/avatica/remote/JsonService.java     |  9 ++++
 .../calcite/avatica/remote/LocalService.java    |  6 +++
 .../calcite/avatica/remote/MockJsonService.java | 16 +++++-
 .../calcite/avatica/remote/RemoteMeta.java      |  5 ++
 .../apache/calcite/avatica/remote/Service.java  | 34 ++++++++++--
 .../calcite/avatica/test/RemoteDriverTest.java  |  4 +-
 pom.xml                                         |  5 ++
 12 files changed, 145 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 3e6040c..0e58323 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index edc2887..f05907f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -154,6 +154,7 @@ public abstract class AvaticaConnection implements Connection {
       // Per specification, if onConnectionClose throws, this method will throw
       // a SQLException, but statement will still be closed.
       try {
+        meta.closeConnection(handle);
         driver.handler.onConnectionClose(this);
       } catch (RuntimeException e) {
         throw helper.createException("While closing connection", e);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index 22ea681..ecd3ee3 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -200,6 +200,9 @@ public interface Meta {
    */
   void closeStatement(StatementHandle h);
 
+  /** Close a connection */
+  void closeConnection(ConnectionHandle ch);
+
   /** Factory to create instances of {@link Meta}. */
   interface Factory {
     Meta create(List<String> args);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index d6eca46..3ebff75 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -162,6 +162,19 @@ public abstract class MetaImpl implements Meta {
     }
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    // TODO: implement
+    //
+    // lots of Calcite tests break with this simple implementation,
+    // requires investigation
+
+//    try {
+//      connection.close();
+//    } catch (SQLException e) {
+//      throw new RuntimeException(e);
+//    }
+  }
+
   public StatementHandle createStatement(ConnectionHandle ch) {
     return new StatementHandle(ch.id, connection.statementCount++, null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index f864614..24bdbda 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -20,6 +20,9 @@ import org.apache.calcite.avatica.AvaticaParameter;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.lang.reflect.Array;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
@@ -43,6 +46,9 @@ import java.util.UUID;
 
 /** Implementation of {@link Meta} upon an existing JDBC data source. */
 public class JdbcMeta implements Meta {
+
+  private static final Log LOG = LogFactory.getLog(JdbcMeta.class);
+
   /**
    * JDBC Types Mapped to Java Types
    *
@@ -417,7 +423,11 @@ public class JdbcMeta implements Meta {
       final Statement statement = conn.createStatement();
       final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(ch.id, id, null);
+      StatementHandle h = new StatementHandle(ch.id, id, null);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("created statement " + h);
+      }
+      return h;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -426,11 +436,17 @@ public class JdbcMeta implements Meta {
   @Override public void closeStatement(StatementHandle h) {
     Statement stmt = statementMap.get(h.id).statement;
     if (stmt == null) {
+      LOG.debug("client requested close unknown statement " + h);
       return;
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closing statement " + h);
+    }
     try {
-      assert stmt.getConnection() == connectionMap.get(h.connectionId);
+      boolean isOwned =
+          stmt.getConnection() == connectionMap.get(h.connectionId);
       stmt.close();
+      assert isOwned : "no connection found while closing " + h;
     } catch (SQLException e) {
       throw propagate(e);
     } finally {
@@ -438,6 +454,24 @@ public class JdbcMeta implements Meta {
     }
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    Connection conn = connectionMap.get(ch.id);
+    if (conn == null) {
+      LOG.debug("client requested close unknown connection " + ch);
+      return;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closing connection " + ch);
+    }
+    try {
+      conn.close();
+    } catch (SQLException e) {
+      throw propagate(e);
+    } finally {
+      connectionMap.remove(ch.id);
+    }
+  }
+
   private RuntimeException propagate(Throwable e) {
     if (e instanceof RuntimeException) {
       throw (RuntimeException) e;
@@ -455,8 +489,12 @@ public class JdbcMeta implements Meta {
       final PreparedStatement statement = conn.prepareStatement(sql);
       final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
-          statement.getParameterMetaData(), sql));
+      StatementHandle h = new StatementHandle(ch.id, id, signature(
+          statement.getMetaData(), statement.getParameterMetaData(), sql));
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("prepared statement " + h);
+      }
+      return h;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -471,7 +509,12 @@ public class JdbcMeta implements Meta {
       final StatementInfo info = new StatementInfo(statement);
       statementMap.put(id, info);
       info.resultSet = statement.executeQuery();
-      return JdbcResultSet.create(ch.id, id, info.resultSet);
+      MetaResultSet mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
+      if (LOG.isTraceEnabled()) {
+        StatementHandle h = new StatementHandle(ch.id, id, null);
+        LOG.trace("prepAndExec statement " + h);
+      }
+      return mrs;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -479,6 +522,9 @@ public class JdbcMeta implements Meta {
 
   public Frame fetch(StatementHandle h, List<Object> parameterValues,
       int offset, int fetchMaxRowCount) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
+    }
     final StatementInfo statementInfo = statementMap.get(h.id);
     try {
       assert statementInfo.statement.getConnection()

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 8ba08cf..dc4268b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -138,6 +138,15 @@ public abstract class JsonService implements Service {
       throw handle(e);
     }
   }
+
+  @Override
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    try {
+      return decode(apply(encode(request)), CloseConnectionResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
 }
 
 // End JsonService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 89717b6..719ef1d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -196,6 +196,12 @@ public class LocalService implements Service {
         request.connectionId, request.statementId, null));
     return new CloseStatementResponse();
   }
+
+  @Override
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    meta.closeConnection(new Meta.ConnectionHandle(request.connectionId));
+    return new CloseConnectionResponse();
+  }
 }
 
 // End LocalService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
index 19e59e3..02cb191 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -36,13 +36,27 @@ public class MockJsonService extends JsonService {
   }
 
   @Override public String apply(String request) {
-    final String response = map.get(request);
+    String response = map.get(request);
+    if (response == null) {
+      response = handleCloseConnection(request);
+    }
     if (response == null) {
       throw new RuntimeException("No response for " + request);
     }
     return response;
   }
 
+  /**
+   * Special case for closeConnection because connection IDs are random.
+   * @return response if is a CloseConnectionRequest, null otherwise.
+   */
+  private static String handleCloseConnection(String request) {
+    if (request.contains("closeConnection")) {
+      return "{\"response\":\"closeConnection\"}";
+    }
+    return null;
+  }
+
   /** Factory that creates a {@code MockJsonService}. */
   public static class Factory implements Service.Factory {
     public Service create(AvaticaConnection connection) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 0a5a3a0..e72ad8b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -64,6 +64,11 @@ class RemoteMeta extends MetaImpl {
         service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    final Service.CloseConnectionResponse response =
+        service.apply(new Service.CloseConnectionRequest(ch.id));
+  }
+
   @Override public MetaResultSet getCatalogs() {
     final Service.ResultSetResponse response =
         service.apply(new Service.CatalogsRequest());

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index ea03348..c56524d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -40,6 +40,7 @@ public interface Service {
   FetchResponse apply(FetchRequest request);
   CreateStatementResponse apply(CreateStatementRequest request);
   CloseStatementResponse apply(CloseStatementRequest request);
+  CloseConnectionResponse apply(CloseConnectionRequest request);
 
   /** Factory that creates a {@code Service}. */
   interface Factory {
@@ -65,7 +66,9 @@ public interface Service {
       @JsonSubTypes.Type(value = CreateStatementRequest.class,
           name = "createStatement"),
       @JsonSubTypes.Type(value = CloseStatementRequest.class,
-          name = "closeStatement") })
+          name = "closeStatement"),
+      @JsonSubTypes.Type(value = CloseConnectionRequest.class,
+          name = "closeConnection") })
   abstract class Request {
     abstract Response accept(Service service);
   }
@@ -82,7 +85,9 @@ public interface Service {
       @JsonSubTypes.Type(value = CreateStatementResponse.class,
           name = "createStatement"),
       @JsonSubTypes.Type(value = CloseStatementResponse.class,
-          name = "closeStatement") })
+          name = "closeStatement"),
+      @JsonSubTypes.Type(value = CloseConnectionResponse.class,
+          name = "closeConnection") })
   abstract class Response {
   }
 
@@ -202,7 +207,7 @@ public interface Service {
   }
 
   /** Request for
-   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
+   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
   class PrepareAndExecuteRequest extends Request {
     public final String connectionId;
     public final String sql;
@@ -356,6 +361,29 @@ public interface Service {
     @JsonCreator
     public CloseStatementResponse() {}
   }
+
+  /** Request for
+   * {@link Meta#closeConnection(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
+  class CloseConnectionRequest extends Request {
+    public final String connectionId;
+
+    @JsonCreator
+    public CloseConnectionRequest(
+        @JsonProperty("connectionId") String connectionId) {
+      this.connectionId = connectionId;
+    }
+
+    @Override CloseConnectionResponse accept(Service service) {
+      return service.apply(this);
+    }
+  }
+
+  /** Response from
+   * {@link org.apache.calcite.avatica.remote.Service.CloseConnectionRequest}. */
+  class CloseConnectionResponse extends Response {
+    @JsonCreator
+    public CloseConnectionResponse() {}
+  }
 }
 
 // End Service.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 6713632..db4b42a 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -236,8 +236,8 @@ public class RemoteDriverTest {
         s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
     conn2.close();
     conn1.close();
-//    assertEquals("closing a connection closes the server-side connection",
-//        1, connectionMap.size());
+    assertEquals("closing a connection closes the server-side connection",
+        1, connectionMap.size());
   }
 
   private void checkStatementExecuteQuery(Connection connection)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f870774..7d26118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,6 +191,11 @@ limitations under the License.
         <version>3.2</version>
       </dependency>
       <dependency>
+        <groupId>commons-logging</groupId>
+        <artifactId>commons-logging</artifactId>
+        <version>1.1.3</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.janino</groupId>
         <artifactId>janino</artifactId>
         <version>2.7.6</version>


[08/10] incubator-calcite git commit: [CALCITE-636] Connection isolation for Avatica clients (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-636] Connection isolation for Avatica clients (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ad6019b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ad6019b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ad6019b

Branch: refs/heads/master
Commit: 0ad6019bc9efa9e6ecc36647d27e258aad3f4eaa
Parents: 208eda6
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:24:45 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 .../calcite/avatica/AvaticaConnection.java      |  24 ++--
 .../calcite/avatica/AvaticaStatement.java       |   8 +-
 .../java/org/apache/calcite/avatica/Meta.java   |  36 ++++--
 .../org/apache/calcite/avatica/MetaImpl.java    |   5 +-
 .../calcite/avatica/UnregisteredDriver.java     |   2 +-
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 120 ++++++++++++++-----
 .../calcite/avatica/jdbc/JdbcResultSet.java     |  13 +-
 .../calcite/avatica/remote/LocalService.java    |  44 ++++---
 .../calcite/avatica/remote/RemoteMeta.java      |  27 +++--
 .../apache/calcite/avatica/remote/Service.java  |  54 ++++++---
 .../calcite/avatica/test/RemoteDriverTest.java  | 101 +++++++++++++---
 .../apache/calcite/jdbc/CalciteMetaImpl.java    |  17 ++-
 12 files changed, 314 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 293cc6c..edc2887 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.TimeZone;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -57,7 +58,8 @@ public abstract class AvaticaConnection implements Connection {
   private int networkTimeout;
   private String catalog;
 
-  public final int id;
+  public final String id;
+  public final Meta.ConnectionHandle handle;
   protected final UnregisteredDriver driver;
   protected final AvaticaFactory factory;
   final String url;
@@ -70,8 +72,6 @@ public abstract class AvaticaConnection implements Connection {
   public final Map<Integer, AvaticaStatement> statementMap =
       new ConcurrentHashMap<>();
 
-  private static int nextId;
-
   /**
    * Creates an AvaticaConnection.
    *
@@ -87,7 +87,8 @@ public abstract class AvaticaConnection implements Connection {
       AvaticaFactory factory,
       String url,
       Properties info) {
-    this.id = nextId++;
+    this.id = UUID.randomUUID().toString();
+    this.handle = new Meta.ConnectionHandle(this.id);
     this.driver = driver;
     this.factory = factory;
     this.url = url;
@@ -272,12 +273,9 @@ public abstract class AvaticaConnection implements Connection {
       int resultSetConcurrency,
       int resultSetHoldability) throws SQLException {
     try {
-      // TODO: cut out round-trip to create a statement handle
       final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(id);
-      final Meta.StatementHandle h = meta.createStatement(ch);
-
-      final Meta.Signature x = meta.prepare(h, sql, -1);
-      return factory.newPreparedStatement(this, h, x, resultSetType,
+      final Meta.StatementHandle h = meta.prepare(ch, sql, -1);
+      return factory.newPreparedStatement(this, h, h.signature, resultSetType,
           resultSetConcurrency, resultSetHoldability);
     } catch (RuntimeException e) {
       throw helper.createException("while preparing SQL: " + sql, e);
@@ -440,8 +438,8 @@ public abstract class AvaticaConnection implements Connection {
   protected ResultSet prepareAndExecuteInternal(
       final AvaticaStatement statement, String sql, int maxRowCount)
       throws SQLException {
-    Meta.MetaResultSet x = meta.prepareAndExecute(statement.handle, sql,
-        maxRowCount, new Meta.PrepareCallback() {
+    Meta.MetaResultSet x = meta.prepareAndExecute(handle, sql, maxRowCount,
+        new Meta.PrepareCallback() {
           public Object getMonitor() {
             return statement;
           }
@@ -477,8 +475,8 @@ public abstract class AvaticaConnection implements Connection {
 
   protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
       throws SQLException {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(metaResultSet.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        metaResultSet.connectionId, metaResultSet.statementId, null);
     final AvaticaStatement statement = lookupStatement(h);
     return executeQueryInternal(statement, metaResultSet.signature.sanitize(),
         metaResultSet.firstFrame);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 2ad74bf..8276b07 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -92,8 +92,12 @@ public abstract class AvaticaStatement
     try {
       // In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0
       final int maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount;
-      Meta.Signature x = connection.meta.prepare(handle, sql, maxRowCount1);
-      return executeInternal(x);
+      ResultSet resultSet =
+          connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+      if (resultSet.isClosed()) {
+        return false;
+      }
+      return true;
     } catch (RuntimeException e) {
       throw connection.helper.createException("while executing SQL: " + sql, e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index cd0166c..22ea681 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -153,22 +153,22 @@ public interface Meta {
 
   /** Prepares a statement.
    *
-   * @param h Statement handle
+   * @param ch Connection handle
    * @param sql SQL query
    * @param maxRowCount Negative for no limit (different meaning than JDBC)
    * @return Signature of prepared statement
    */
-  Signature prepare(StatementHandle h, String sql, int maxRowCount);
+  StatementHandle prepare(ConnectionHandle ch, String sql, int maxRowCount);
 
   /** Prepares and executes a statement.
    *
-   * @param h Statement handle
+   * @param ch Connection handle
    * @param sql SQL query
    * @param maxRowCount Negative for no limit (different meaning than JDBC)
    * @param callback Callback to lock, clear and assign cursor
-   * @return Signature of prepared statement
+   * @return MetaResultSet containing statement ID and first frame of data
    */
-  MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+  MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
       int maxRowCount, PrepareCallback callback);
 
   /** Returns a frame of rows.
@@ -223,14 +223,16 @@ public interface Meta {
 
   /** Meta data from which a result set can be constructed. */
   class MetaResultSet {
+    public final String connectionId;
     public final int statementId;
     public final boolean ownStatement;
     public final Frame firstFrame;
     public final Signature signature;
 
-    public MetaResultSet(int statementId, boolean ownStatement,
-        Signature signature, Frame firstFrame) {
+    public MetaResultSet(String connectionId, int statementId,
+        boolean ownStatement, Signature signature, Frame firstFrame) {
       this.signature = Objects.requireNonNull(signature);
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.ownStatement = ownStatement;
       this.firstFrame = firstFrame; // may be null
@@ -430,29 +432,39 @@ public interface Meta {
 
   /** Connection handle. */
   class ConnectionHandle {
-    public final int id;
+    public final String id;
 
     @Override public String toString() {
-      return Integer.toString(id);
+      return id;
     }
 
     @JsonCreator
-    public ConnectionHandle(@JsonProperty("id") int id) {
+    public ConnectionHandle(@JsonProperty("id") String id) {
       this.id = id;
     }
   }
 
   /** Statement handle. */
   class StatementHandle {
+    public final String connectionId;
     public final int id;
 
+    // not final because LocalService#apply(PrepareRequest)
+    /** Only present for PreparedStatement handles, null otherwise. */
+    public Signature signature;
+
     @Override public String toString() {
-      return Integer.toString(id);
+      return connectionId + "::" + Integer.toString(id);
     }
 
     @JsonCreator
-    public StatementHandle(@JsonProperty("id") int id) {
+    public StatementHandle(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("id") int id,
+        @JsonProperty("signature") Signature signature) {
+      this.connectionId = connectionId;
       this.id = id;
+      this.signature = signature;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index a6c5f3c..d6eca46 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -163,7 +163,7 @@ public abstract class MetaImpl implements Meta {
   }
 
   public StatementHandle createStatement(ConnectionHandle ch) {
-    return new StatementHandle(connection.statementCount++);
+    return new StatementHandle(ch.id, connection.statementCount++, null);
   }
 
   /** Creates an empty result set. Useful for JDBC metadata methods that are
@@ -215,7 +215,8 @@ public abstract class MetaImpl implements Meta {
       final Signature signature =
           new Signature(columns, "", Collections.<AvaticaParameter>emptyList(),
               internalParameters, cursorFactory);
-      return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+      return new MetaResultSet(connection.id, statement.getId(), true,
+          signature, firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
index fefd5f4..5f8d492 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
@@ -42,7 +42,7 @@ import java.util.logging.Logger;
  *
  * <p>The provider must implement:</p>
  * <ul>
- *   <li>{@link Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}
+ *   <li>{@link Meta#prepare(Meta.ConnectionHandle, String, int)}
  *   <li>{@link Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)}
  * </ul>
  */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 72c3948..f864614 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.ParameterMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,6 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
 
 /** Implementation of {@link Meta} upon an existing JDBC data source. */
 public class JdbcMeta implements Meta {
@@ -72,6 +75,13 @@ public class JdbcMeta implements Meta {
     SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
   }
 
+  private static final String DEFAULT_CONN_ID =
+      UUID.fromString("00000000-0000-0000-0000-000000000000").toString();
+
+  private final String url;
+  private final Properties info;
+  private final Connection connection; // TODO: remove default connection
+  private final Map<String, Connection> connectionMap = new HashMap<>();
   private final Map<Integer, StatementInfo> statementMap = new HashMap<>();
 
   /**
@@ -124,7 +134,7 @@ public class JdbcMeta implements Meta {
   protected static Signature signature(ResultSetMetaData metaData,
       ParameterMetaData parameterMetaData, String sql) throws  SQLException {
     return new Signature(columns(metaData), sql, parameters(parameterMetaData),
-        null, CursorFactory.ARRAY);
+        null, CursorFactory.LIST /* LIST because JdbcResultSet#frame */);
   }
 
   protected static Signature signature(ResultSetMetaData metaData)
@@ -132,10 +142,43 @@ public class JdbcMeta implements Meta {
     return signature(metaData, null, null);
   }
 
-  protected final Connection connection;
+  /**
+   * @param url a database url of the form
+   *  <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   */
+  public JdbcMeta(String url) throws SQLException {
+    this(url, new Properties());
+  }
 
-  public JdbcMeta(Connection connection) {
-    this.connection = connection;
+  /**
+   * @param url a database url of the form
+   * <code>jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   * @param user the database user on whose behalf the connection is being
+   *   made
+   * @param password the user's password
+   */
+  public JdbcMeta(final String url, final String user, final String password)
+      throws SQLException {
+    this(url, new Properties() {
+      {
+        put("user", user);
+        put("password", password);
+      }
+    });
+  }
+
+  /**
+   * @param url a database url of the form
+   * <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   * @param info a list of arbitrary string tag/value pairs as
+   * connection arguments; normally at least a "user" and
+   * "password" property should be included
+   */
+  public JdbcMeta(String url, Properties info) throws SQLException {
+    this.url = url;
+    this.info = info;
+    this.connection = DriverManager.getConnection(url, info);
+    this.connectionMap.put(DEFAULT_CONN_ID, connection);
   }
 
   public String getSqlKeywords() {
@@ -170,7 +213,7 @@ public class JdbcMeta implements Meta {
       Pat tableNamePattern, List<String> typeList) {
     try {
       String[] types = new String[typeList == null ? 0 : typeList.size()];
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getTables(catalog, schemaPattern.s,
               tableNamePattern.s,
               typeList == null ? types : typeList.toArray(types)));
@@ -182,7 +225,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getColumns(String catalog, Pat schemaPattern,
       Pat tableNamePattern, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getColumns(catalog, schemaPattern.s,
               tableNamePattern.s, columnNamePattern.s));
     } catch (SQLException e) {
@@ -192,7 +235,7 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getSchemas(String catalog, Pat schemaPattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getSchemas(catalog, schemaPattern.s));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -201,7 +244,8 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getCatalogs() {
     try {
-      return JdbcResultSet.create(connection.getMetaData().getCatalogs());
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+          connection.getMetaData().getCatalogs());
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -209,7 +253,8 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getTableTypes() {
     try {
-      return JdbcResultSet.create(connection.getMetaData().getTableTypes());
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+          connection.getMetaData().getTableTypes());
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -218,7 +263,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getProcedures(String catalog, Pat schemaPattern,
       Pat procedureNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getProcedures(catalog, schemaPattern.s,
               procedureNamePattern.s));
     } catch (SQLException e) {
@@ -229,7 +274,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getProcedureColumns(String catalog, Pat schemaPattern,
       Pat procedureNamePattern, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getProcedureColumns(catalog,
               schemaPattern.s, procedureNamePattern.s, columnNamePattern.s));
     } catch (SQLException e) {
@@ -240,7 +285,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getColumnPrivileges(String catalog, String schema,
       String table, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getColumnPrivileges(catalog, schema,
               table, columnNamePattern.s));
     } catch (SQLException e) {
@@ -251,7 +296,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getTablePrivileges(String catalog, Pat schemaPattern,
       Pat tableNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getTablePrivileges(catalog,
               schemaPattern.s, tableNamePattern.s));
     } catch (SQLException e) {
@@ -262,7 +307,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getBestRowIdentifier(String catalog, String schema,
       String table, int scope, boolean nullable) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getBestRowIdentifier(catalog, schema,
               table, scope, nullable));
     } catch (SQLException e) {
@@ -273,7 +318,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getVersionColumns(String catalog, String schema,
       String table) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getVersionColumns(catalog, schema, table));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -283,7 +328,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getPrimaryKeys(String catalog, String schema,
       String table) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getPrimaryKeys(catalog, schema, table));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -359,12 +404,20 @@ public class JdbcMeta implements Meta {
     return null;
   }
 
+  protected Connection getConnection(String id) throws SQLException {
+    if (connectionMap.get(id) == null) {
+      connectionMap.put(id, DriverManager.getConnection(url, info));
+    }
+    return connectionMap.get(id);
+  }
+
   public StatementHandle createStatement(ConnectionHandle ch) {
     try {
-      final Statement statement = connection.createStatement();
-      final int id = statementMap.size();
+      final Connection conn = getConnection(ch.id);
+      final Statement statement = conn.createStatement();
+      final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(id);
+      return new StatementHandle(ch.id, id, null);
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -376,6 +429,7 @@ public class JdbcMeta implements Meta {
       return;
     }
     try {
+      assert stmt.getConnection() == connectionMap.get(h.connectionId);
       stmt.close();
     } catch (SQLException e) {
       throw propagate(e);
@@ -394,24 +448,30 @@ public class JdbcMeta implements Meta {
     }
   }
 
-  public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
-    // TODO: can't actually prepare an existing statement...
+  public StatementHandle prepare(ConnectionHandle ch, String sql,
+      int maxRowCount) {
     try {
-      PreparedStatement statement = connection.prepareStatement(sql);
-      statementMap.put(h.id, new StatementInfo(statement));
-      return signature(statement.getMetaData(),
-          statement.getParameterMetaData(), sql);
+      final Connection conn = getConnection(ch.id);
+      final PreparedStatement statement = conn.prepareStatement(sql);
+      final int id = System.identityHashCode(statement);
+      statementMap.put(id, new StatementInfo(statement));
+      return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
+          statement.getParameterMetaData(), sql));
     } catch (SQLException e) {
       throw propagate(e);
     }
   }
 
-  public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+  public MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
       int maxRowCount, PrepareCallback callback) {
-    final StatementInfo statementInfo = statementMap.get(h.id);
     try {
-      statementInfo.resultSet = statementInfo.statement.executeQuery(sql);
-      return JdbcResultSet.create(statementInfo.resultSet);
+      final Connection connection = getConnection(ch.id);
+      final PreparedStatement statement = connection.prepareStatement(sql);
+      final int id = System.identityHashCode(statement);
+      final StatementInfo info = new StatementInfo(statement);
+      statementMap.put(id, info);
+      info.resultSet = statement.executeQuery();
+      return JdbcResultSet.create(ch.id, id, info.resultSet);
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -421,6 +481,8 @@ public class JdbcMeta implements Meta {
       int offset, int fetchMaxRowCount) {
     final StatementInfo statementInfo = statementMap.get(h.id);
     try {
+      assert statementInfo.statement.getConnection()
+          == connectionMap.get(h.connectionId);
       if (statementInfo.resultSet == null || parameterValues != null) {
         if (statementInfo.resultSet != null) {
           statementInfo.resultSet.close();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index ac4aeb5..827f31d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -30,19 +30,20 @@ import java.util.List;
  *
  *  @see org.apache.calcite.avatica.jdbc.JdbcMeta */
 class JdbcResultSet extends Meta.MetaResultSet {
-  protected JdbcResultSet(int statementId, boolean ownStatement,
-      Meta.Signature signature, Meta.Frame firstFrame) {
-    super(statementId, ownStatement, signature, firstFrame);
+  protected JdbcResultSet(String connectionId, int statementId,
+      boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame) {
+    super(connectionId, statementId, ownStatement, signature, firstFrame);
   }
 
   /** Creates a result set. */
-  public static JdbcResultSet create(ResultSet resultSet) {
+  public static JdbcResultSet create(String connectionId, int statementId,
+      ResultSet resultSet) {
     try {
-      int id = resultSet.getStatement().hashCode();
       Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData());
       final Meta.Frame firstFrame = frame(resultSet, 0, -1);
       resultSet.close();
-      return new JdbcResultSet(id, true, sig, firstFrame);
+      return new JdbcResultSet(connectionId, statementId, true, sig,
+          firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index e302b85..89717b6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -59,6 +59,7 @@ public class LocalService implements Service {
         cursorFactory = Meta.CursorFactory.LIST;
         break;
       case MAP:
+      case LIST:
         break;
       default:
         cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
@@ -72,13 +73,13 @@ public class LocalService implements Service {
     if (cursorFactory != resultSet.signature.cursorFactory) {
       signature = signature.setCursorFactory(cursorFactory);
     }
-    return new ResultSetResponse(resultSet.statementId, resultSet.ownStatement,
-        signature, new Meta.Frame(0, true, list));
+    return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
+        resultSet.ownStatement, signature, new Meta.Frame(0, true, list));
   }
 
   private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(resultSet.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        resultSet.connectionId, resultSet.statementId, null);
     final Iterable<Object> iterable = meta.createIterable(h,
         resultSet.signature, Collections.emptyList(), resultSet.firstFrame);
     final List<List<Object>> list = new ArrayList<>();
@@ -118,12 +119,12 @@ public class LocalService implements Service {
   }
 
   public PrepareResponse apply(PrepareRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
     final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
-    Meta.Signature signature =
-        meta.prepare(h, request.sql, request.maxRowCount)
-            .setCursorFactory(Meta.CursorFactory.LIST);
+        meta.prepare(ch, request.sql, request.maxRowCount);
     if (json) {
+      Meta.Signature signature = h.signature;
       final List<ColumnMetaData> columns = new ArrayList<>();
       for (ColumnMetaData column : signature.columns) {
         switch (column.type.rep) {
@@ -146,33 +147,37 @@ public class LocalService implements Service {
       signature = new Meta.Signature(columns, signature.sql,
           signature.parameters, signature.internalParameters,
           signature.cursorFactory);
+      h.signature = signature;
     }
-    return new PrepareResponse(signature);
+    return new PrepareResponse(h);
   }
 
   public ResultSetResponse apply(PrepareAndExecuteRequest request) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
     final Meta.MetaResultSet resultSet =
-        meta.prepareAndExecute(h, request.sql, request.maxRowCount,
+        meta.prepareAndExecute(ch, request.sql, request.maxRowCount,
             new Meta.PrepareCallback() {
               @Override public Object getMonitor() {
                 return LocalService.class;
               }
 
-              @Override public void clear() {}
+              @Override public void clear() {
+              }
 
               @Override public void assign(Meta.Signature signature,
-                  Meta.Frame firstFrame) {}
+                  Meta.Frame firstFrame) {
+              }
 
-              @Override public void execute() {}
+              @Override public void execute() {
+              }
             });
     return toResponse(resultSet);
   }
 
   public FetchResponse apply(FetchRequest request) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        request.connectionId, request.statementId, null);
     final Meta.Frame frame =
         meta.fetch(h, request.parameterValues, request.offset,
             request.fetchMaxRowCount);
@@ -182,12 +187,13 @@ public class LocalService implements Service {
   public CreateStatementResponse apply(CreateStatementRequest request) {
     final Meta.StatementHandle h =
         meta.createStatement(new Meta.ConnectionHandle(request.connectionId));
-    return new CreateStatementResponse(h.id);
+    return new CreateStatementResponse(h.connectionId, h.id);
   }
 
   @Override
   public CloseStatementResponse apply(CloseStatementRequest request) {
-    meta.closeStatement(new Meta.StatementHandle(request.statementId));
+    meta.closeStatement(new Meta.StatementHandle(
+        request.connectionId, request.statementId, null));
     return new CloseStatementResponse();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e8bbfb5..0a5a3a0 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -48,19 +48,20 @@ class RemoteMeta extends MetaImpl {
       signature0 = Signature.create(columns,
           "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY);
     }
-    return new MetaResultSet(response.statementId, response.ownStatement,
-        signature0, response.firstFrame);
+    return new MetaResultSet(response.connectionId, response.statementId,
+        response.ownStatement, signature0, response.firstFrame);
   }
 
   @Override public StatementHandle createStatement(ConnectionHandle ch) {
     final Service.CreateStatementResponse response =
         service.apply(new Service.CreateStatementRequest(ch.id));
-    return new StatementHandle(response.id);
+    return new StatementHandle(response.connectionId, response.statementId,
+        null);
   }
 
   @Override public void closeStatement(StatementHandle h) {
     final Service.CloseStatementResponse response =
-        service.apply(new Service.CloseStatementRequest(h.id));
+        service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
   }
 
   @Override public MetaResultSet getCatalogs() {
@@ -99,21 +100,21 @@ class RemoteMeta extends MetaImpl {
     return toResultSet(MetaColumn.class, response);
   }
 
-  @Override public Signature prepare(StatementHandle h, String sql,
+  @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
       int maxRowCount) {
-    final Service.PrepareResponse response =
-        service.apply(new Service.PrepareRequest(h.id, sql, maxRowCount));
-    return response.signature;
+    final Service.PrepareResponse response = service.apply(
+        new Service.PrepareRequest(ch.id, sql, maxRowCount));
+    return response.statement;
   }
 
-  @Override public MetaResultSet prepareAndExecute(StatementHandle h,
+  @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
       String sql, int maxRowCount, PrepareCallback callback) {
     final Service.ResultSetResponse response;
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
-        response = service.apply(
-            new Service.PrepareAndExecuteRequest(h.id, sql, maxRowCount));
+        response = service.apply(new Service.PrepareAndExecuteRequest(
+            ch.id, sql, maxRowCount));
         callback.assign(response.signature, response.firstFrame);
       }
       callback.execute();
@@ -127,8 +128,8 @@ class RemoteMeta extends MetaImpl {
       int offset, int fetchMaxRowCount) {
     final Service.FetchResponse response =
         service.apply(
-            new Service.FetchRequest(h.id, parameterValues, offset,
-                fetchMaxRowCount));
+            new Service.FetchRequest(h.connectionId, h.id, parameterValues,
+                offset, fetchMaxRowCount));
     return response.frame;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index bbe3552..ea03348 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -180,16 +180,20 @@ public interface Service {
    * {@link Meta#getTableTypes()}
    * return this response. */
   class ResultSetResponse extends Response {
+    public final String connectionId;
     public final int statementId;
     public final boolean ownStatement;
     public final Meta.Signature signature;
     public final Meta.Frame firstFrame;
 
     @JsonCreator
-    public ResultSetResponse(@JsonProperty("statementId") int statementId,
+    public ResultSetResponse(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId,
         @JsonProperty("ownStatement") boolean ownStatement,
         @JsonProperty("signature") Meta.Signature signature,
         @JsonProperty("firstFrame") Meta.Frame firstFrame) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.ownStatement = ownStatement;
       this.signature = signature;
@@ -200,16 +204,16 @@ public interface Service {
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
   class PrepareAndExecuteRequest extends Request {
-    public final int statementId;
+    public final String connectionId;
     public final String sql;
     public final int maxRowCount;
 
     @JsonCreator
     public PrepareAndExecuteRequest(
-        @JsonProperty("statementId") int statementId,
+        @JsonProperty("connectionId") String connectionId,
         @JsonProperty("sql") String sql,
         @JsonProperty("maxRowCount") int maxRowCount) {
-      this.statementId = statementId;
+      this.connectionId = connectionId;
       this.sql = sql;
       this.maxRowCount = maxRowCount;
     }
@@ -220,17 +224,18 @@ public interface Service {
   }
 
   /** Request for
-   * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}. */
+   * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int)}. */
   class PrepareRequest extends Request {
-    public final int statementId;
+    public final String connectionId;
     public final String sql;
     public final int maxRowCount;
 
     @JsonCreator
-    public PrepareRequest(@JsonProperty("statementId") int statementId,
+    public PrepareRequest(
+        @JsonProperty("connectionId") String connectionId,
         @JsonProperty("sql") String sql,
         @JsonProperty("maxRowCount") int maxRowCount) {
-      this.statementId = statementId;
+      this.connectionId = connectionId;
       this.sql = sql;
       this.maxRowCount = maxRowCount;
     }
@@ -243,18 +248,19 @@ public interface Service {
   /** Response from
    * {@link org.apache.calcite.avatica.remote.Service.PrepareRequest}. */
   class PrepareResponse extends Response {
-    public final Meta.Signature signature;
+    public final Meta.StatementHandle statement;
 
     @JsonCreator
     public PrepareResponse(
-        @JsonProperty("signature") Meta.Signature signature) {
-      this.signature = signature;
+        @JsonProperty("statement") Meta.StatementHandle statement) {
+      this.statement = statement;
     }
   }
 
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#fetch(Meta.StatementHandle, List, int, int)}. */
   class FetchRequest extends Request {
+    public final String connectionId;
     public final int statementId;
     public final int offset;
     /** Maximum number of rows to be returned in the frame. Negative means no
@@ -265,10 +271,13 @@ public interface Service {
     public final List<Object> parameterValues;
 
     @JsonCreator
-    public FetchRequest(@JsonProperty("statementId") int statementId,
+    public FetchRequest(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId,
         @JsonProperty("parameterValues") List<Object> parameterValues,
         @JsonProperty("offset") int offset,
         @JsonProperty("fetchMaxRowCount") int fetchMaxRowCount) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.parameterValues = parameterValues;
       this.offset = offset;
@@ -294,10 +303,11 @@ public interface Service {
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#createStatement(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
   class CreateStatementRequest extends Request {
-    public final int connectionId;
+    public final String connectionId;
 
     @JsonCreator
-    public CreateStatementRequest(@JsonProperty("signature") int connectionId) {
+    public CreateStatementRequest(
+        @JsonProperty("signature") String connectionId) {
       this.connectionId = connectionId;
     }
 
@@ -309,21 +319,29 @@ public interface Service {
   /** Response from
    * {@link org.apache.calcite.avatica.remote.Service.CreateStatementRequest}. */
   class CreateStatementResponse extends Response {
-    public final int id;
+    public final String connectionId;
+    public final int statementId;
 
     @JsonCreator
-    public CreateStatementResponse(@JsonProperty("id") int id) {
-      this.id = id;
+    public CreateStatementResponse(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId) {
+      this.connectionId = connectionId;
+      this.statementId = statementId;
     }
   }
 
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#closeStatement(org.apache.calcite.avatica.Meta.StatementHandle)}. */
   class CloseStatementRequest extends Request {
+    public final String connectionId;
     public final int statementId;
 
     @JsonCreator
-    public CloseStatementRequest(@JsonProperty("id") int statementId) {
+    public CloseStatementRequest(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 9c03b3b..6713632 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.avatica.test;
 
 import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
@@ -27,6 +28,7 @@ import org.apache.calcite.avatica.remote.Service;
 
 import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
 
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -72,6 +74,11 @@ public class RemoteDriverTest {
     return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRJS);
   }
 
+  @Before
+  public void before() throws Exception {
+    QuasiRemoteJdbcServiceFactory.initService();
+  }
+
   @Test public void testRegister() throws Exception {
     final Connection connection =
         DriverManager.getConnection("jdbc:avatica:remote:");
@@ -194,7 +201,7 @@ public class RemoteDriverTest {
     try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
       Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
       Map<Integer, Statement> serverMap =
-          QuasiRemoteJdbcServiceFactory.getRemoteMap(connection);
+          QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
       assertEquals(0, clientMap.size());
       assertEquals(0, serverMap.size());
       Statement stmt = connection.createStatement();
@@ -206,6 +213,33 @@ public class RemoteDriverTest {
     }
   }
 
+  @Test public void testConnectionIsolation() throws Exception {
+    final String sql = "select * from (values (1, 'a'))";
+    Connection conn1 = ljs();
+    Connection conn2 = ljs();
+    Map<String, Connection> connectionMap =
+        QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
+            (AvaticaConnection) conn1);
+    assertEquals("should contain at least the default connection",
+        1, connectionMap.size());
+    PreparedStatement conn1stmt1 = conn1.prepareStatement(sql);
+    assertEquals(
+        "statement creation implicitly creates a connection server-side",
+        2, connectionMap.size());
+    PreparedStatement conn2stmt1 = conn2.prepareStatement(sql);
+    assertEquals(
+        "statement creation implicitly creates a connection server-side",
+        3, connectionMap.size());
+    AvaticaPreparedStatement s1 = (AvaticaPreparedStatement) conn1stmt1;
+    AvaticaPreparedStatement s2 = (AvaticaPreparedStatement) conn2stmt1;
+    assertFalse("connection id's should be unique",
+        s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
+    conn2.close();
+    conn1.close();
+//    assertEquals("closing a connection closes the server-side connection",
+//        1, connectionMap.size());
+  }
+
   private void checkStatementExecuteQuery(Connection connection)
       throws SQLException {
     final Statement statement = connection.createStatement();
@@ -278,42 +312,51 @@ public class RemoteDriverTest {
     connection.close();
   }
 
-  /** Factory that creates a service based on a local JDBC connection. */
+  /**
+   * Factory that creates a service based on a local JDBC connection.
+   */
   public static class LocalJdbcServiceFactory implements Service.Factory {
     @Override public Service create(AvaticaConnection connection) {
       try {
-        Connection connection1 =
-            DriverManager.getConnection(CONNECTION_SPEC.url,
-                CONNECTION_SPEC.username, CONNECTION_SPEC.password);
-        return new LocalService(new JdbcMeta(connection1));
+        return new LocalService(new JdbcMeta(CONNECTION_SPEC.url,
+            CONNECTION_SPEC.username, CONNECTION_SPEC.password));
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
   }
 
-  /** Factory that creates a service based on a local JDBC connection. */
+  /**
+   * Factory that creates a service based on a local JDBC connection.
+   */
   public static class QuasiRemoteJdbcServiceFactory implements Service.Factory {
-    @Override public Service create(AvaticaConnection connection) {
+
+    /** a singleton instance that is recreated for each test */
+    private static Service service;
+
+    static void initService() {
       try {
-        Connection connection1 =
-            DriverManager.getConnection(CONNECTION_SPEC.url,
-                CONNECTION_SPEC.username, CONNECTION_SPEC.password);
-        final JdbcMeta jdbcMeta = new JdbcMeta(connection1);
+        final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
+            CONNECTION_SPEC.username, CONNECTION_SPEC.password);
         final LocalService localService = new LocalService(jdbcMeta);
-        final LocalJsonService localJsonService =
-            new LocalJsonService(localService);
-        return localJsonService;
+        service = new LocalJsonService(localService);
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
 
+    @Override public Service create(AvaticaConnection connection) {
+      assert service != null;
+      return service;
+    }
+
     /**
      * Reach into the guts of a quasi-remote connection and pull out the
      * statement map from the other side.
+     * TODO: refactor tests to replace reflection with package-local access
      */
-    static Map<Integer, Statement> getRemoteMap(AvaticaConnection connection) throws Exception {
+    static Map<Integer, Statement>
+    getRemoteStatementMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
       Meta clientMeta = (Meta) metaF.get(connection);
@@ -332,6 +375,32 @@ public class RemoteDriverTest {
       jdbcMetaStatementMapF.setAccessible(true);
       return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
     }
+
+    /**
+     * Reach into the guts of a quasi-remote connection and pull out the
+     * connection map from the other side.
+     * TODO: refactor tests to replace reflection with package-local access
+     */
+    static Map<String, Connection>
+    getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
+      Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+      metaF.setAccessible(true);
+      Meta clientMeta = (Meta) metaF.get(connection);
+      Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+      remoteMetaServiceF.setAccessible(true);
+      LocalJsonService remoteMetaService = (LocalJsonService) remoteMetaServiceF.get(clientMeta);
+      Field remoteMetaServiceServiceF = remoteMetaService.getClass().getDeclaredField("service");
+      remoteMetaServiceServiceF.setAccessible(true);
+      LocalService remoteMetaServiceService =
+          (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+      Field remoteMetaServiceServiceMetaF =
+          remoteMetaServiceService.getClass().getDeclaredField("meta");
+      remoteMetaServiceServiceMetaF.setAccessible(true);
+      JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("connectionMap");
+      jdbcMetaStatementMapF.setAccessible(true);
+      return (Map<String, Connection>) jdbcMetaStatementMapF.get(serverMeta);
+    }
   }
 
   /** Information necessary to create a JDBC connection. Specify one to run

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index ad2f9cf..849ef63 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -190,7 +190,8 @@ public class CalciteMetaImpl extends MetaImpl {
               return Linq4j.asEnumerable(firstFrame.rows);
             }
           };
-      return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+      return new MetaResultSet(connection.id, statement.getId(), true,
+          signature, firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -459,16 +460,20 @@ public class CalciteMetaImpl extends MetaImpl {
     }
   }
 
-  public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
+  @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
+      int maxRowCount) {
+    final StatementHandle h = createStatement(ch);
     final CalciteConnectionImpl calciteConnection = getConnection();
     CalciteServerStatement statement = calciteConnection.server.getStatement(h);
-    return calciteConnection.parseQuery(sql, statement.createPrepareContext(),
+    calciteConnection.parseQuery(sql, statement.createPrepareContext(),
         maxRowCount);
+    return h;
   }
 
-  public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
-      int maxRowCount, PrepareCallback callback) {
+  @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
+      String sql, int maxRowCount, PrepareCallback callback) {
     final CalcitePrepare.CalciteSignature<Object> signature;
+    final StatementHandle h = createStatement(ch);
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
@@ -480,7 +485,7 @@ public class CalciteMetaImpl extends MetaImpl {
         callback.assign(signature, null);
       }
       callback.execute();
-      return new MetaResultSet(h.id, false, signature, null);
+      return new MetaResultSet(h.connectionId, h.id, false, signature, null);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }


[03/10] incubator-calcite git commit: [CALCITE-617] Check at initialization time in CachingInvocationHandler that MD provider is not null (Jesus Camacho Rodriguez)

Posted by jh...@apache.org.
[CALCITE-617] Check at initialization time in CachingInvocationHandler that MD provider is not null (Jesus Camacho Rodriguez)

Close apache/incubator-calcite#58


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/321dc430
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/321dc430
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/321dc430

Branch: refs/heads/master
Commit: 321dc4303bccd260eb94a957dbc419fe6e5c6afe
Parents: 61eea9c
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Wed Mar 11 16:55:05 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 11:37:44 2015 -0700

----------------------------------------------------------------------
 .../apache/calcite/rel/metadata/CachingRelMetadataProvider.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/321dc430/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
index a45501b..516de3a 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.rel.RelNode;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.lang.reflect.InvocationHandler;
@@ -100,7 +101,7 @@ public class CachingRelMetadataProvider implements RelMetadataProvider {
     private final Metadata metadata;
 
     public CachingInvocationHandler(Metadata metadata) {
-      this.metadata = metadata;
+      this.metadata = Preconditions.checkNotNull(metadata);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)


[05/10] incubator-calcite git commit: [CALCITE-626] Implement CloseStatement RPC (Nick Dimiduk)

Posted by jh...@apache.org.
[CALCITE-626] Implement CloseStatement RPC (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/208eda66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/208eda66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/208eda66

Branch: refs/heads/master
Commit: 208eda6666b51fa5daed80e454dfb5aee0dff652
Parents: aa7cb03
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:44:48 2015 -0700
Committer: julianhyde <jh...@apache.org>
Committed: Fri Mar 27 13:20:31 2015 -0700

----------------------------------------------------------------------
 .../calcite/avatica/AvaticaConnection.java      |  4 +-
 .../calcite/avatica/AvaticaStatement.java       |  7 +++
 .../java/org/apache/calcite/avatica/Meta.java   |  4 ++
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 14 +++++
 .../calcite/avatica/remote/JsonService.java     |  9 +++
 .../calcite/avatica/remote/LocalService.java    |  6 ++
 .../calcite/avatica/remote/RemoteMeta.java      |  5 ++
 .../apache/calcite/avatica/remote/Service.java  | 31 +++++++++-
 .../calcite/avatica/test/RemoteDriverTest.java  | 63 ++++++++++++++++++++
 .../apache/calcite/jdbc/CalciteMetaImpl.java    |  7 +++
 10 files changed, 147 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index bc8c3eb..293cc6c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 /**
@@ -66,7 +67,8 @@ public abstract class AvaticaConnection implements Connection {
   protected final AvaticaDatabaseMetaData metaData;
   public final Helper helper = Helper.INSTANCE;
   public final Map<InternalProperty, Object> properties = new HashMap<>();
-  public final Map<Integer, AvaticaStatement> statementMap = new HashMap<>();
+  public final Map<Integer, AvaticaStatement> statementMap =
+      new ConcurrentHashMap<>();
 
   private static int nextId;
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 6df3528..2ad74bf 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -142,6 +142,13 @@ public abstract class AvaticaStatement
         openResultSet = null;
         c.close();
       }
+      try {
+        // inform the server to close the resource
+        connection.meta.closeStatement(handle);
+      } finally {
+        // make sure we don't leak on our side
+        connection.statementMap.remove(handle.id);
+      }
       // If onStatementClose throws, this method will throw an exception (later
       // converted to SQLException), but this statement still gets closed.
       connection.driver.handler.onStatementClose(this);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index f7f3b80..cd0166c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -196,6 +196,10 @@ public interface Meta {
    */
   StatementHandle createStatement(ConnectionHandle ch);
 
+  /** Close a statement.
+   */
+  void closeStatement(StatementHandle h);
+
   /** Factory to create instances of {@link Meta}. */
   interface Factory {
     Meta create(List<String> args);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 56c20c4..72c3948 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -370,6 +370,20 @@ public class JdbcMeta implements Meta {
     }
   }
 
+  @Override public void closeStatement(StatementHandle h) {
+    Statement stmt = statementMap.get(h.id).statement;
+    if (stmt == null) {
+      return;
+    }
+    try {
+      stmt.close();
+    } catch (SQLException e) {
+      throw propagate(e);
+    } finally {
+      statementMap.remove(h.id);
+    }
+  }
+
   private RuntimeException propagate(Throwable e) {
     if (e instanceof RuntimeException) {
       throw (RuntimeException) e;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index cc0ca6e..8ba08cf 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -129,6 +129,15 @@ public abstract class JsonService implements Service {
       throw handle(e);
     }
   }
+
+  @Override
+  public CloseStatementResponse apply(CloseStatementRequest request) {
+    try {
+      return decode(apply(encode(request)), CloseStatementResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
 }
 
 // End JsonService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index fa7611e..e302b85 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -184,6 +184,12 @@ public class LocalService implements Service {
         meta.createStatement(new Meta.ConnectionHandle(request.connectionId));
     return new CreateStatementResponse(h.id);
   }
+
+  @Override
+  public CloseStatementResponse apply(CloseStatementRequest request) {
+    meta.closeStatement(new Meta.StatementHandle(request.statementId));
+    return new CloseStatementResponse();
+  }
 }
 
 // End LocalService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 10439ce..e8bbfb5 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -58,6 +58,11 @@ class RemoteMeta extends MetaImpl {
     return new StatementHandle(response.id);
   }
 
+  @Override public void closeStatement(StatementHandle h) {
+    final Service.CloseStatementResponse response =
+        service.apply(new Service.CloseStatementRequest(h.id));
+  }
+
   @Override public MetaResultSet getCatalogs() {
     final Service.ResultSetResponse response =
         service.apply(new Service.CatalogsRequest());

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index da897da..bbe3552 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -39,6 +39,7 @@ public interface Service {
   ResultSetResponse apply(PrepareAndExecuteRequest request);
   FetchResponse apply(FetchRequest request);
   CreateStatementResponse apply(CreateStatementRequest request);
+  CloseStatementResponse apply(CloseStatementRequest request);
 
   /** Factory that creates a {@code Service}. */
   interface Factory {
@@ -62,7 +63,9 @@ public interface Service {
           name = "prepareAndExecute"),
       @JsonSubTypes.Type(value = FetchRequest.class, name = "fetch"),
       @JsonSubTypes.Type(value = CreateStatementRequest.class,
-          name = "createStatement") })
+          name = "createStatement"),
+      @JsonSubTypes.Type(value = CloseStatementRequest.class,
+          name = "closeStatement") })
   abstract class Request {
     abstract Response accept(Service service);
   }
@@ -77,7 +80,9 @@ public interface Service {
       @JsonSubTypes.Type(value = PrepareResponse.class, name = "prepare"),
       @JsonSubTypes.Type(value = FetchResponse.class, name = "fetch"),
       @JsonSubTypes.Type(value = CreateStatementResponse.class,
-          name = "createStatement") })
+          name = "createStatement"),
+      @JsonSubTypes.Type(value = CloseStatementResponse.class,
+          name = "closeStatement") })
   abstract class Response {
   }
 
@@ -311,6 +316,28 @@ public interface Service {
       this.id = id;
     }
   }
+
+  /** Request for
+   * {@link org.apache.calcite.avatica.Meta#closeStatement(org.apache.calcite.avatica.Meta.StatementHandle)}. */
+  class CloseStatementRequest extends Request {
+    public final int statementId;
+
+    @JsonCreator
+    public CloseStatementRequest(@JsonProperty("id") int statementId) {
+      this.statementId = statementId;
+    }
+
+    @Override CloseStatementResponse accept(Service service) {
+      return service.apply(this);
+    }
+  }
+
+  /** Response from
+   * {@link org.apache.calcite.avatica.remote.Service.CloseStatementRequest}. */
+  class CloseStatementResponse extends Response {
+    @JsonCreator
+    public CloseStatementResponse() {}
+  }
 }
 
 // End Service.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index c007f02..9c03b3b 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -17,6 +17,8 @@
 package org.apache.calcite.avatica.test;
 
 import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
 import org.apache.calcite.avatica.remote.LocalJsonService;
 import org.apache.calcite.avatica.remote.LocalService;
@@ -28,6 +30,7 @@ import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ParameterMetaData;
@@ -36,6 +39,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -186,6 +190,41 @@ public class RemoteDriverTest {
     connection.close();
   }
 
+  @Test public void testStatementLifecycle() throws Exception {
+    try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
+      Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
+      Map<Integer, Statement> serverMap =
+          QuasiRemoteJdbcServiceFactory.getRemoteMap(connection);
+      assertEquals(0, clientMap.size());
+      assertEquals(0, serverMap.size());
+      Statement stmt = connection.createStatement();
+      assertEquals(1, clientMap.size());
+      assertEquals(1, serverMap.size());
+      stmt.close();
+      assertEquals(0, clientMap.size());
+      assertEquals(0, serverMap.size());
+    }
+  }
+
+  private void checkStatementExecuteQuery(Connection connection)
+      throws SQLException {
+    final Statement statement = connection.createStatement();
+    final ResultSet resultSet =
+        statement.executeQuery("select * from (\n"
+            + "  values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)");
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("C1", metaData.getColumnName(1));
+    assertEquals("C2", metaData.getColumnName(2));
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertFalse(resultSet.next());
+    resultSet.close();
+    statement.close();
+    connection.close();
+  }
+
   @Test public void testPrepareBindExecuteFetch() throws Exception {
     checkPrepareBindExecuteFetch(ljs());
   }
@@ -269,6 +308,30 @@ public class RemoteDriverTest {
         throw new RuntimeException(e);
       }
     }
+
+    /**
+     * Reach into the guts of a quasi-remote connection and pull out the
+     * statement map from the other side.
+     */
+    static Map<Integer, Statement> getRemoteMap(AvaticaConnection connection) throws Exception {
+      Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+      metaF.setAccessible(true);
+      Meta clientMeta = (Meta) metaF.get(connection);
+      Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+      remoteMetaServiceF.setAccessible(true);
+      LocalJsonService remoteMetaService = (LocalJsonService) remoteMetaServiceF.get(clientMeta);
+      Field remoteMetaServiceServiceF = remoteMetaService.getClass().getDeclaredField("service");
+      remoteMetaServiceServiceF.setAccessible(true);
+      LocalService remoteMetaServiceService =
+          (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+      Field remoteMetaServiceServiceMetaF =
+          remoteMetaServiceService.getClass().getDeclaredField("meta");
+      remoteMetaServiceServiceMetaF.setAccessible(true);
+      JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementMap");
+      jdbcMetaStatementMapF.setAccessible(true);
+      return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
+    }
   }
 
   /** Information necessary to create a JDBC connection. Specify one to run

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index cf395f6..ad2f9cf 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -135,6 +135,13 @@ public class CalciteMetaImpl extends MetaImpl {
     return h;
   }
 
+  @Override public void closeStatement(StatementHandle h) {
+    final CalciteConnectionImpl calciteConnection = getConnection();
+    CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
+    // stmt.close(); // TODO: implement
+    calciteConnection.server.removeStatement(h);
+  }
+
   private <E> MetaResultSet createResultSet(Enumerable<E> enumerable,
       Class clazz, String... names) {
     final List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();