You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/03/28 22:23:50 UTC
[01/10] incubator-calcite git commit: [CALCITE-648] Update
ProjectMergeRule description for new naming convention (Jinfeng Ni)
Repository: incubator-calcite
Updated Branches:
refs/heads/master 9e0be8bca -> e2833a297
[CALCITE-648] Update ProjectMergeRule description for new naming convention (Jinfeng Ni)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/a24b3c1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/a24b3c1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/a24b3c1f
Branch: refs/heads/master
Commit: a24b3c1f0877db1da64108af9bceb3e7023dc049
Parents: 9e0be8b
Author: Julian Hyde <jh...@apache.org>
Authored: Thu Mar 26 19:35:34 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Mar 26 19:35:34 2015 -0700
----------------------------------------------------------------------
.../calcite/rel/rules/ProjectMergeRule.java | 23 ++++++++++----------
.../org/apache/calcite/tools/PlannerTest.java | 17 +++++++++++++--
2 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a24b3c1f/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
index 96559cf..cef654e 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectMergeRule.java
@@ -62,18 +62,17 @@ public class ProjectMergeRule extends RelOptRule {
*
* @param force Whether to always merge projects
*/
- public ProjectMergeRule(boolean force, ProjectFactory pFactory) {
+ public ProjectMergeRule(boolean force, ProjectFactory projectFactory) {
super(
operand(Project.class,
operand(Project.class, any())),
- "ProjectMergeRule" + (force ? ": force mode" : ""));
+ "ProjectMergeRule" + (force ? ":force_mode" : ""));
this.force = force;
- projectFactory = pFactory;
+ this.projectFactory = projectFactory;
}
//~ Methods ----------------------------------------------------------------
- // implement RelOptRule
public void onMatch(RelOptRuleCall call) {
Project topProject = call.rel(0);
Project bottomProject = call.rel(1);
@@ -121,11 +120,11 @@ public class ProjectMergeRule extends RelOptRule {
rexBuilder);
// create a RexProgram for the topmost project
- List<RexNode> projExprs = topProject.getProjects();
+ final List<RexNode> projects = topProject.getProjects();
RexProgram topProgram =
RexProgram.create(
bottomProject.getRowType(),
- projExprs,
+ projects,
null,
topProject.getRowType(),
rexBuilder);
@@ -139,16 +138,16 @@ public class ProjectMergeRule extends RelOptRule {
// re-expand the topmost projection expressions, now that they
// reference the children of the bottom-most project
- int nProjExprs = projExprs.size();
- List<RexNode> newProjExprs = new ArrayList<RexNode>();
- List<RexLocalRef> projList = mergedProgram.getProjectList();
- for (int i = 0; i < nProjExprs; i++) {
- newProjExprs.add(mergedProgram.expandLocalRef(projList.get(i)));
+ final int projectCount = projects.size();
+ final List<RexNode> newProjects = new ArrayList<>();
+ List<RexLocalRef> projectRefs = mergedProgram.getProjectList();
+ for (int i = 0; i < projectCount; i++) {
+ newProjects.add(mergedProgram.expandLocalRef(projectRefs.get(i)));
}
// replace the two projects with a combined projection
RelNode newProjectRel = projectFactory.createProject(
- bottomProject.getInput(), newProjExprs,
+ bottomProject.getInput(), newProjects,
topProject.getRowType().getFieldNames());
call.transformTo(newProjectRel);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/a24b3c1f/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
index 55f287c..ff6094a 100644
--- a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
@@ -38,9 +38,11 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.ProjectToWindowRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.type.RelDataType;
@@ -473,7 +475,7 @@ public class PlannerTest {
FilterMergeRule.INSTANCE,
EnumerableRules.ENUMERABLE_FILTER_RULE,
EnumerableRules.ENUMERABLE_PROJECT_RULE);
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
traitDefs.add(ConventionTraitDef.INSTANCE);
traitDefs.add(RelCollationTraitDef.INSTANCE);
@@ -979,7 +981,7 @@ public class PlannerTest {
+ "order by ps.psPartkey, ps.psSupplyCost) t \n"
+ "order by t.psPartkey";
- List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+ List<RelTraitDef> traitDefs = new ArrayList<>();
traitDefs.add(ConventionTraitDef.INSTANCE);
traitDefs.add(RelCollationTraitDef.INSTANCE);
final SqlParser.Config parserConfig =
@@ -1005,6 +1007,17 @@ public class PlannerTest {
+ " LogicalProject(psPartkey=[$0], psSupplyCost=[$1])\n"
+ " EnumerableTableScan(table=[[tpch, partsupp]])\n"));
}
+
+ /** Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-648">[CALCITE-649]
+ * Update ProjectMergeRule description for new naming convention</a>. */
+ @Test public void testMergeProjectForceMode() throws Exception {
+ RuleSet ruleSet =
+ RuleSets.ofList(
+ new ProjectMergeRule(true, RelFactories.DEFAULT_PROJECT_FACTORY));
+ Planner planner = getPlanner(null, Programs.of(ruleSet));
+ planner.close();
+ }
}
// End PlannerTest.java
[07/10] incubator-calcite git commit: [CALCITE-640] Avatica server
should expire stale connections/statements (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-640] Avatica server should expire stale connections/statements (Nick Dimiduk)
Close apache/incubator-calcite#65
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0f824f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0f824f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0f824f81
Branch: refs/heads/master
Commit: 0f824f819234c5d08aac88f73ba50894b8b23a54
Parents: c446e02
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:25:00 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700
----------------------------------------------------------------------
.../org/apache/calcite/avatica/server/Main.java | 14 +-
avatica/pom.xml | 4 +
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 198 ++++++++++++++++---
.../calcite/avatica/remote/JsonService.java | 2 -
.../calcite/avatica/remote/LocalService.java | 7 +-
.../calcite/avatica/remote/RemoteMeta.java | 4 +-
.../calcite/avatica/test/RemoteDriverTest.java | 39 ++--
7 files changed, 217 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
index d3a24f0..f8416d6 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -36,10 +36,16 @@ public class Main {
}
/**
- * Create and start an {@link HttpServer}.
- * @param args <br /> args[0]: the {@link Meta.Factory} class name.<br />
- * args[1:]: arguments passed along to
- * {@link Meta.Factory#create(java.util.List)}
+ * Creates and starts an {@link HttpServer}.
+ *
+ * <p>Arguments are as follows:
+ * <ul>
+ * <li>args[0]: the {@link Meta.Factory} class name
+ * <li>args[1+]: arguments passed along to
+ * {@link Meta.Factory#create(java.util.List)}
+ * </ul>
+ *
+ * @param args Command-line arguments
*/
public static HttpServer start(String[] args)
throws ClassNotFoundException, InstantiationException,
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 0e58323..0bacc4a 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 24bdbda..627a0d7 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -23,6 +23,11 @@ import org.apache.calcite.avatica.Meta;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
import java.lang.reflect.Array;
import java.lang.reflect.Type;
import java.math.BigDecimal;
@@ -43,6 +48,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
/** Implementation of {@link Meta} upon an existing JDBC data source. */
public class JdbcMeta implements Meta {
@@ -81,14 +87,64 @@ public class JdbcMeta implements Meta {
SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
}
+ //
+ // Constants for connection cache settings.
+ //
+
+ private static final String CONN_CACHE_KEY_BASE = "avatica.connectioncache";
+ /** JDBC connection property for setting connection cache concurrency level. */
+ public static final String CONN_CACHE_CONCURRENCY_KEY =
+ CONN_CACHE_KEY_BASE + ".concurrency";
+ public static final String DEFAULT_CONN_CACHE_CONCURRENCY_LEVEL = "10";
+ /** JDBC connection property for setting connection cache initial capacity. */
+ public static final String CONN_CACHE_INITIAL_CAPACITY_KEY =
+ CONN_CACHE_KEY_BASE + ".initialcapacity";
+ public static final String DEFAULT_CONN_CACHE_INITIAL_CAPACITY = "100";
+ /** JDBC connection property for setting connection cache maximum capacity. */
+ public static final String CONN_CACHE_MAX_CAPACITY_KEY =
+ CONN_CACHE_KEY_BASE + ".maxcapacity";
+ public static final String DEFAULT_CONN_CACHE_MAX_CAPACITY = "1000";
+ /** JDBC connection property for setting connection cache expiration duration. */
+ public static final String CONN_CACHE_EXPIRY_DURATION_KEY =
+ CONN_CACHE_KEY_BASE + ".expirydiration";
+ public static final String DEFAULT_CONN_CACHE_EXPIRY_DURATION = "10";
+ /** JDBC connection property for setting connection cache expiration unit. */
+ public static final String CONN_CACHE_EXPIRY_UNIT_KEY = CONN_CACHE_KEY_BASE + ".expiryunit";
+ public static final String DEFAULT_CONN_CACHE_EXPIRY_UNIT = TimeUnit.MINUTES.name();
+
+ //
+ // Constants for statement cache settings.
+ //
+
+ private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache";
+ /** JDBC connection property for setting connection cache concurrency level. */
+ public static final String STMT_CACHE_CONCURRENCY_KEY =
+ STMT_CACHE_KEY_BASE + ".concurrency";
+ public static final String DEFAULT_STMT_CACHE_CONCURRENCY_LEVEL = "100";
+ /** JDBC connection property for setting connection cache initial capacity. */
+ public static final String STMT_CACHE_INITIAL_CAPACITY_KEY =
+ STMT_CACHE_KEY_BASE + ".initialcapacity";
+ public static final String DEFAULT_STMT_CACHE_INITIAL_CAPACITY = "1000";
+ /** JDBC connection property for setting connection cache maximum capacity. */
+ public static final String STMT_CACHE_MAX_CAPACITY_KEY =
+ STMT_CACHE_KEY_BASE + ".maxcapacity";
+ public static final String DEFAULT_STMT_CACHE_MAX_CAPACITY = "10000";
+ /** JDBC connection property for setting connection cache expiration duration. */
+ public static final String STMT_CACHE_EXPIRY_DURATION_KEY =
+ STMT_CACHE_KEY_BASE + ".expirydiration";
+ public static final String DEFAULT_STMT_CACHE_EXPIRY_DURATION = "5";
+ /** JDBC connection property for setting connection cache expiration unit. */
+ public static final String STMT_CACHE_EXPIRY_UNIT_KEY = STMT_CACHE_KEY_BASE + ".expiryunit";
+ public static final String DEFAULT_STMT_CACHE_EXPIRY_UNIT = TimeUnit.MINUTES.name();
+
private static final String DEFAULT_CONN_ID =
UUID.fromString("00000000-0000-0000-0000-000000000000").toString();
private final String url;
private final Properties info;
private final Connection connection; // TODO: remove default connection
- private final Map<String, Connection> connectionMap = new HashMap<>();
- private final Map<Integer, StatementInfo> statementMap = new HashMap<>();
+ private final Cache<String, Connection> connectionCache;
+ private final Cache<Integer, StatementInfo> statementCache;
/**
* Convert from JDBC metadata to Avatica columns.
@@ -148,6 +204,58 @@ public class JdbcMeta implements Meta {
return signature(metaData, null, null);
}
+ /** Callback for {@link #connectionCache} member expiration. */
+ private class ConnectionExpiryHandler
+ implements RemovalListener<String, Connection> {
+
+ public void onRemoval(RemovalNotification<String, Connection> notification) {
+ String connectionId = notification.getKey();
+ Connection doomed = notification.getValue();
+ // is String.equals() more efficient?
+ if (notification.getValue() == connection) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expiring connection " + connectionId + " because "
+ + notification.getCause());
+ }
+ try {
+ if (doomed != null) {
+ doomed.close();
+ }
+ } catch (Throwable t) {
+ LOG.info("Exception thrown while expiring connection " + connectionId, t);
+ }
+ }
+ }
+
+ /** Callback for {@link #statementCache} member expiration. */
+ private class StatementExpiryHandler
+ implements RemovalListener<Integer, StatementInfo> {
+ public void onRemoval(RemovalNotification<Integer, StatementInfo> notification) {
+ Integer stmtId = notification.getKey();
+ StatementInfo doomed = notification.getValue();
+ if (doomed == null) {
+ // log/throw?
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expiring statement " + stmtId + " because "
+ + notification.getCause());
+ }
+ try {
+ if (doomed.resultSet != null) {
+ doomed.resultSet.close();
+ }
+ if (doomed.statement != null) {
+ doomed.statement.close();
+ }
+ } catch (Throwable t) {
+ LOG.info("Exception thrown while expiring statement " + stmtId);
+ }
+ }
+ }
+
/**
* @param url a database url of the form
* <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
@@ -184,7 +292,48 @@ public class JdbcMeta implements Meta {
this.url = url;
this.info = info;
this.connection = DriverManager.getConnection(url, info);
- this.connectionMap.put(DEFAULT_CONN_ID, connection);
+
+ int concurrencyLevel = Integer.parseInt(
+ info.getProperty(CONN_CACHE_CONCURRENCY_KEY, DEFAULT_CONN_CACHE_CONCURRENCY_LEVEL));
+ int initialCapacity = Integer.parseInt(
+ info.getProperty(CONN_CACHE_INITIAL_CAPACITY_KEY, DEFAULT_CONN_CACHE_INITIAL_CAPACITY));
+ long maxCapacity = Long.parseLong(
+ info.getProperty(CONN_CACHE_MAX_CAPACITY_KEY, DEFAULT_CONN_CACHE_MAX_CAPACITY));
+ long connectionExpiryDuration = Long.parseLong(
+ info.getProperty(CONN_CACHE_EXPIRY_DURATION_KEY, DEFAULT_CONN_CACHE_EXPIRY_DURATION));
+ TimeUnit connectionExpiryUnit = TimeUnit.valueOf(
+ info.getProperty(CONN_CACHE_EXPIRY_UNIT_KEY, DEFAULT_CONN_CACHE_EXPIRY_UNIT));
+ this.connectionCache = CacheBuilder.newBuilder()
+ .concurrencyLevel(concurrencyLevel)
+ .initialCapacity(initialCapacity)
+ .maximumSize(maxCapacity)
+ .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit)
+ .removalListener(new ConnectionExpiryHandler())
+ .build();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("instantiated connection cache: " + connectionCache.stats());
+ }
+
+ concurrencyLevel = Integer.parseInt(
+ info.getProperty(STMT_CACHE_CONCURRENCY_KEY, DEFAULT_STMT_CACHE_CONCURRENCY_LEVEL));
+ initialCapacity = Integer.parseInt(
+ info.getProperty(STMT_CACHE_INITIAL_CAPACITY_KEY, DEFAULT_STMT_CACHE_INITIAL_CAPACITY));
+ maxCapacity = Long.parseLong(
+ info.getProperty(STMT_CACHE_MAX_CAPACITY_KEY, DEFAULT_STMT_CACHE_MAX_CAPACITY));
+ connectionExpiryDuration = Long.parseLong(
+ info.getProperty(STMT_CACHE_EXPIRY_DURATION_KEY, DEFAULT_STMT_CACHE_EXPIRY_DURATION));
+ connectionExpiryUnit = TimeUnit.valueOf(
+ info.getProperty(STMT_CACHE_EXPIRY_UNIT_KEY, DEFAULT_STMT_CACHE_EXPIRY_UNIT));
+ this.statementCache = CacheBuilder.newBuilder()
+ .concurrencyLevel(concurrencyLevel)
+ .initialCapacity(initialCapacity)
+ .maximumSize(maxCapacity)
+ .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit)
+ .removalListener(new StatementExpiryHandler())
+ .build();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("instantiated statement cache: " + statementCache.stats());
+ }
}
public String getSqlKeywords() {
@@ -411,10 +560,12 @@ public class JdbcMeta implements Meta {
}
protected Connection getConnection(String id) throws SQLException {
- if (connectionMap.get(id) == null) {
- connectionMap.put(id, DriverManager.getConnection(url, info));
+ Connection conn = connectionCache.getIfPresent(id);
+ if (conn == null) {
+ conn = DriverManager.getConnection(url, info);
+ connectionCache.put(id, conn);
}
- return connectionMap.get(id);
+ return conn;
}
public StatementHandle createStatement(ConnectionHandle ch) {
@@ -422,7 +573,7 @@ public class JdbcMeta implements Meta {
final Connection conn = getConnection(ch.id);
final Statement statement = conn.createStatement();
final int id = System.identityHashCode(statement);
- statementMap.put(id, new StatementInfo(statement));
+ statementCache.put(id, new StatementInfo(statement));
StatementHandle h = new StatementHandle(ch.id, id, null);
if (LOG.isTraceEnabled()) {
LOG.trace("created statement " + h);
@@ -434,8 +585,8 @@ public class JdbcMeta implements Meta {
}
@Override public void closeStatement(StatementHandle h) {
- Statement stmt = statementMap.get(h.id).statement;
- if (stmt == null) {
+ StatementInfo info = statementCache.getIfPresent(h.id);
+ if (info == null || info.statement == null) {
LOG.debug("client requested close unknown statement " + h);
return;
}
@@ -443,19 +594,19 @@ public class JdbcMeta implements Meta {
LOG.trace("closing statement " + h);
}
try {
- boolean isOwned =
- stmt.getConnection() == connectionMap.get(h.connectionId);
- stmt.close();
- assert isOwned : "no connection found while closing " + h;
+ if (info.resultSet != null) {
+ info.resultSet.close();
+ }
+ info.statement.close();
} catch (SQLException e) {
throw propagate(e);
} finally {
- statementMap.remove(h.id);
+ statementCache.invalidate(h.id);
}
}
@Override public void closeConnection(ConnectionHandle ch) {
- Connection conn = connectionMap.get(ch.id);
+ Connection conn = connectionCache.getIfPresent(ch.id);
if (conn == null) {
LOG.debug("client requested close unknown connection " + ch);
return;
@@ -468,7 +619,7 @@ public class JdbcMeta implements Meta {
} catch (SQLException e) {
throw propagate(e);
} finally {
- connectionMap.remove(ch.id);
+ connectionCache.invalidate(ch.id);
}
}
@@ -488,9 +639,10 @@ public class JdbcMeta implements Meta {
final Connection conn = getConnection(ch.id);
final PreparedStatement statement = conn.prepareStatement(sql);
final int id = System.identityHashCode(statement);
- statementMap.put(id, new StatementInfo(statement));
- StatementHandle h = new StatementHandle(ch.id, id, signature(
- statement.getMetaData(), statement.getParameterMetaData(), sql));
+ statementCache.put(id, new StatementInfo(statement));
+ StatementHandle h = new StatementHandle(ch.id, id,
+ signature(statement.getMetaData(), statement.getParameterMetaData(),
+ sql));
if (LOG.isTraceEnabled()) {
LOG.trace("prepared statement " + h);
}
@@ -507,7 +659,7 @@ public class JdbcMeta implements Meta {
final PreparedStatement statement = connection.prepareStatement(sql);
final int id = System.identityHashCode(statement);
final StatementInfo info = new StatementInfo(statement);
- statementMap.put(id, info);
+ statementCache.put(id, info);
info.resultSet = statement.executeQuery();
MetaResultSet mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
if (LOG.isTraceEnabled()) {
@@ -525,10 +677,10 @@ public class JdbcMeta implements Meta {
if (LOG.isTraceEnabled()) {
LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
}
- final StatementInfo statementInfo = statementMap.get(h.id);
try {
- assert statementInfo.statement.getConnection()
- == connectionMap.get(h.connectionId);
+ final StatementInfo statementInfo = Objects.requireNonNull(
+ statementCache.getIfPresent(h.id),
+ "Statement not found, potentially expired. " + h);
if (statementInfo.resultSet == null || parameterValues != null) {
if (statementInfo.resultSet != null) {
statementInfo.resultSet.close();
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index dc4268b..b8d640d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -130,7 +130,6 @@ public abstract class JsonService implements Service {
}
}
- @Override
public CloseStatementResponse apply(CloseStatementRequest request) {
try {
return decode(apply(encode(request)), CloseStatementResponse.class);
@@ -139,7 +138,6 @@ public abstract class JsonService implements Service {
}
}
- @Override
public CloseConnectionResponse apply(CloseConnectionRequest request) {
try {
return decode(apply(encode(request)), CloseConnectionResponse.class);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 719ef1d..842ebd6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -190,14 +190,13 @@ public class LocalService implements Service {
return new CreateStatementResponse(h.connectionId, h.id);
}
- @Override
public CloseStatementResponse apply(CloseStatementRequest request) {
- meta.closeStatement(new Meta.StatementHandle(
- request.connectionId, request.statementId, null));
+ meta.closeStatement(
+ new Meta.StatementHandle(request.connectionId, request.statementId,
+ null));
return new CloseStatementResponse();
}
- @Override
public CloseConnectionResponse apply(CloseConnectionRequest request) {
meta.closeConnection(new Meta.ConnectionHandle(request.connectionId));
return new CloseConnectionResponse();
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e72ad8b..772321c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -118,8 +118,8 @@ class RemoteMeta extends MetaImpl {
try {
synchronized (callback.getMonitor()) {
callback.clear();
- response = service.apply(new Service.PrepareAndExecuteRequest(
- ch.id, sql, maxRowCount));
+ response = service.apply(
+ new Service.PrepareAndExecuteRequest(ch.id, sql, maxRowCount));
callback.assign(response.signature, response.firstFrame);
}
callback.execute();
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0f824f81/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index db4b42a..0f6ab8d 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -26,6 +26,8 @@ import org.apache.calcite.avatica.remote.LocalService;
import org.apache.calcite.avatica.remote.MockJsonService;
import org.apache.calcite.avatica.remote.Service;
+import com.google.common.cache.Cache;
+
import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
import org.junit.Before;
@@ -200,7 +202,7 @@ public class RemoteDriverTest {
@Test public void testStatementLifecycle() throws Exception {
try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
- Map<Integer, Statement> serverMap =
+ Cache<Integer, Object> serverMap =
QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
assertEquals(0, clientMap.size());
assertEquals(0, serverMap.size());
@@ -217,27 +219,29 @@ public class RemoteDriverTest {
final String sql = "select * from (values (1, 'a'))";
Connection conn1 = ljs();
Connection conn2 = ljs();
- Map<String, Connection> connectionMap =
+ Cache<String, Connection> connectionMap =
QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
(AvaticaConnection) conn1);
- assertEquals("should contain at least the default connection",
- 1, connectionMap.size());
+ assertEquals("connection cache should start empty",
+ 0, connectionMap.size());
PreparedStatement conn1stmt1 = conn1.prepareStatement(sql);
assertEquals(
"statement creation implicitly creates a connection server-side",
- 2, connectionMap.size());
+ 1, connectionMap.size());
PreparedStatement conn2stmt1 = conn2.prepareStatement(sql);
assertEquals(
"statement creation implicitly creates a connection server-side",
- 3, connectionMap.size());
+ 2, connectionMap.size());
AvaticaPreparedStatement s1 = (AvaticaPreparedStatement) conn1stmt1;
AvaticaPreparedStatement s2 = (AvaticaPreparedStatement) conn2stmt1;
assertFalse("connection id's should be unique",
s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
conn2.close();
- conn1.close();
assertEquals("closing a connection closes the server-side connection",
1, connectionMap.size());
+ conn1.close();
+ assertEquals("closing a connection closes the server-side connection",
+ 0, connectionMap.size());
}
private void checkStatementExecuteQuery(Connection connection)
@@ -318,8 +322,9 @@ public class RemoteDriverTest {
public static class LocalJdbcServiceFactory implements Service.Factory {
@Override public Service create(AvaticaConnection connection) {
try {
- return new LocalService(new JdbcMeta(CONNECTION_SPEC.url,
- CONNECTION_SPEC.username, CONNECTION_SPEC.password));
+ return new LocalService(
+ new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
+ CONNECTION_SPEC.password));
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -355,7 +360,7 @@ public class RemoteDriverTest {
* statement map from the other side.
* TODO: refactor tests to replace reflection with package-local access
*/
- static Map<Integer, Statement>
+ static Cache<Integer, Object>
getRemoteStatementMap(AvaticaConnection connection) throws Exception {
Field metaF = AvaticaConnection.class.getDeclaredField("meta");
metaF.setAccessible(true);
@@ -371,9 +376,10 @@ public class RemoteDriverTest {
remoteMetaServiceService.getClass().getDeclaredField("meta");
remoteMetaServiceServiceMetaF.setAccessible(true);
JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
- Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementMap");
+ Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementCache");
jdbcMetaStatementMapF.setAccessible(true);
- return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
+ //noinspection unchecked
+ return (Cache<Integer, Object>) jdbcMetaStatementMapF.get(serverMeta);
}
/**
@@ -381,7 +387,7 @@ public class RemoteDriverTest {
* connection map from the other side.
* TODO: refactor tests to replace reflection with package-local access
*/
- static Map<String, Connection>
+ static Cache<String, Connection>
getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
Field metaF = AvaticaConnection.class.getDeclaredField("meta");
metaF.setAccessible(true);
@@ -397,9 +403,10 @@ public class RemoteDriverTest {
remoteMetaServiceService.getClass().getDeclaredField("meta");
remoteMetaServiceServiceMetaF.setAccessible(true);
JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
- Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("connectionMap");
- jdbcMetaStatementMapF.setAccessible(true);
- return (Map<String, Connection>) jdbcMetaStatementMapF.get(serverMeta);
+ Field jdbcMetaConnectionCacheF = JdbcMeta.class.getDeclaredField("connectionCache");
+ jdbcMetaConnectionCacheF.setAccessible(true);
+ //noinspection unchecked
+ return (Cache<String, Connection>) jdbcMetaConnectionCacheF.get(serverMeta);
}
}
[06/10] incubator-calcite git commit: [CALCITE-639] Open up
permissions on avatica server components (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-639] Open up permissions on avatica server components (Nick Dimiduk)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/c446e02a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/c446e02a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/c446e02a
Branch: refs/heads/master
Commit: c446e02a4cc4f0e174c1a48cdc68773ab0ad85dd
Parents: 18fea1f
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Thu Mar 26 11:50:56 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/calcite/avatica/server/AvaticaHandler.java | 2 +-
.../java/org/apache/calcite/avatica/server/HttpServer.java | 4 ++--
.../src/main/java/org/apache/calcite/avatica/server/Main.java | 6 ++++++
3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
index 0a5258b..bb4605f 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
@@ -30,7 +30,7 @@ import javax.servlet.http.HttpServletResponse;
/**
* Jetty handler that executes Avatica JSON request-responses.
*/
-class AvaticaHandler extends AbstractHandler {
+public class AvaticaHandler extends AbstractHandler {
final JsonHandler jsonHandler;
public AvaticaHandler(Service service) {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
index f1505f2..4aa0f24 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
@@ -32,12 +32,12 @@ public class HttpServer {
private int port = -1;
private final Handler handler;
- HttpServer(int port, Handler handler) {
+ public HttpServer(int port, Handler handler) {
this.port = port;
this.handler = handler;
}
- void start() {
+ public void start() {
if (server != null) {
throw new RuntimeException("Server is already started");
} else {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c446e02a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
index dfb59cd..d3a24f0 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -35,6 +35,12 @@ public class Main {
server.join();
}
+ /**
+ * Create and start an {@link HttpServer}.
+ * @param args <br /> args[0]: the {@link Meta.Factory} class name.<br />
+ * args[1:]: arguments passed along to
+ * {@link Meta.Factory#create(java.util.List)}
+ */
public static HttpServer start(String[] args)
throws ClassNotFoundException, InstantiationException,
IllegalAccessException {
[04/10] incubator-calcite git commit: [CALCITE-644] Increase check
style line limit to 100 chars (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-644] Increase check style line limit to 100 chars (Nick Dimiduk)
Close apache/incubator-calcite#64
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/aa7cb035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/aa7cb035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/aa7cb035
Branch: refs/heads/master
Commit: aa7cb0355159a78a98573293fd67f6dc692975ae
Parents: 321dc43
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 11:00:35 2015 -0700
Committer: julianhyde <jh...@apache.org>
Committed: Fri Mar 27 13:19:05 2015 -0700
----------------------------------------------------------------------
src/main/config/checkstyle/checker.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/aa7cb035/src/main/config/checkstyle/checker.xml
----------------------------------------------------------------------
diff --git a/src/main/config/checkstyle/checker.xml b/src/main/config/checkstyle/checker.xml
index 0ff2529..5b2fe9f 100644
--- a/src/main/config/checkstyle/checker.xml
+++ b/src/main/config/checkstyle/checker.xml
@@ -226,7 +226,7 @@ limitations under the License.
<!-- Lines cannot exceed 80 chars, except if they are hyperlinks
or strings (possibly preceded by '+' and followed by say '),'. -->
<module name="LineLength">
- <property name="max" value="80"/>
+ <property name="max" value="100"/>
<property name="ignorePattern" value="^import|@see|@link|@BaseMessage|href|^[ +]*".*"[);,]*$"/>
</module>
<!-- Over time, we will revise this down -->
[02/10] incubator-calcite git commit: [CALCITE-649] Extend
splitCondition method in RelOptUtil to handle multiple joins on the same key
(Jesus Camacho Rodriguez)
Posted by jh...@apache.org.
[CALCITE-649] Extend splitCondition method in RelOptUtil to handle multiple joins on the same key (Jesus Camacho Rodriguez)
Also some code cleanup
Close apache/incubator-calcite#66
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/61eea9ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/61eea9ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/61eea9ce
Branch: refs/heads/master
Commit: 61eea9ce4f0b02cb84557477bb58fd38d03e9c5c
Parents: a24b3c1
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Fri Mar 27 11:35:28 2015 +0000
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 10:04:50 2015 -0700
----------------------------------------------------------------------
.../org/apache/calcite/plan/RelOptUtil.java | 277 +++++++++++--------
1 file changed, 160 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/61eea9ce/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index d09bf1f..cf68db9 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -174,7 +174,7 @@ public abstract class RelOptUtil {
if (used.size() == 0) {
return ImmutableList.of();
}
- List<String> result = new ArrayList<String>();
+ final List<String> result = new ArrayList<>();
for (String s : set) {
if (used.contains(s) && !result.contains(s)) {
result.add(s);
@@ -439,7 +439,7 @@ public abstract class RelOptUtil {
}
// for IN/NOT IN, it needs to output the fields
- final List<RexNode> exprs = new ArrayList<RexNode>();
+ final List<RexNode> exprs = new ArrayList<>();
if (subqueryType == SubqueryType.IN) {
for (int i = 0; i < keyCount; i++) {
exprs.add(rexBuilder.makeInputRef(ret, i));
@@ -502,17 +502,16 @@ public abstract class RelOptUtil {
: "rename: field count mismatch: in=" + inputType
+ ", out" + outputType;
- List<Pair<RexNode, String>> renames =
- new ArrayList<Pair<RexNode, String>>();
+ final List<Pair<RexNode, String>> renames = new ArrayList<>();
for (Pair<RelDataTypeField, RelDataTypeField> pair
: Pair.zip(inputFields, outputFields)) {
final RelDataTypeField inputField = pair.left;
final RelDataTypeField outputField = pair.right;
assert inputField.getType().equals(outputField.getType());
+ final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
renames.add(
- Pair.of(
- (RexNode) rel.getCluster().getRexBuilder().makeInputRef(
- inputField.getType(),
+ Pair.<RexNode, String>of(
+ rexBuilder.makeInputRef(inputField.getType(),
inputField.getIndex()),
outputField.getName()));
}
@@ -582,7 +581,7 @@ public abstract class RelOptUtil {
RelNode rel,
Integer[] fieldOrdinals) {
RexNode condition = null;
- RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
+ final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
RelDataType rowType = rel.getRowType();
int n;
if (fieldOrdinals != null) {
@@ -662,9 +661,9 @@ public abstract class RelOptUtil {
// nothing to do
return rel;
}
- List<RexNode> castExps =
- RexUtil.generateCastExpressions(
- rel.getCluster().getRexBuilder(), castRowType, rowType);
+ final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
+ final List<RexNode> castExps =
+ RexUtil.generateCastExpressions(rexBuilder, castRowType, rowType);
if (rename) {
// Use names and types from castRowType.
return projectFactory.createProject(
@@ -691,8 +690,8 @@ public abstract class RelOptUtil {
RelOptCluster cluster,
RelNode rel) {
// assert (rel.getRowType().getFieldCount() == 1);
- int aggCallCnt = rel.getRowType().getFieldCount();
- List<AggregateCall> aggCalls = new ArrayList<AggregateCall>();
+ final int aggCallCnt = rel.getRowType().getFieldCount();
+ final List<AggregateCall> aggCalls = new ArrayList<>();
for (int i = 0; i < aggCallCnt; i++) {
RelDataType returnType =
@@ -794,7 +793,7 @@ public abstract class RelOptUtil {
RexNode condition,
List<Integer> leftKeys,
List<Integer> rightKeys) {
- List<RexNode> nonEquiList = new ArrayList<RexNode>();
+ final List<RexNode> nonEquiList = new ArrayList<>();
splitJoinCondition(
left.getRowType().getFieldCount(),
@@ -819,9 +818,9 @@ public abstract class RelOptUtil {
RelNode left,
RelNode right,
RexNode condition) {
- final List<Integer> leftKeys = new ArrayList<Integer>();
- final List<Integer> rightKeys = new ArrayList<Integer>();
- final List<RexNode> nonEquiList = new ArrayList<RexNode>();
+ final List<Integer> leftKeys = new ArrayList<>();
+ final List<Integer> rightKeys = new ArrayList<>();
+ final List<RexNode> nonEquiList = new ArrayList<>();
splitJoinCondition(
left.getRowType().getFieldCount(),
condition,
@@ -864,29 +863,64 @@ public abstract class RelOptUtil {
List<RexNode> rightJoinKeys,
List<Integer> filterNulls,
List<SqlOperator> rangeOp) {
- List<RexNode> nonEquiList = new ArrayList<RexNode>();
+ return splitJoinCondition(
+ sysFieldList,
+ ImmutableList.of(leftRel, rightRel),
+ condition,
+ ImmutableList.of(leftJoinKeys, rightJoinKeys),
+ filterNulls,
+ rangeOp);
+ }
+
+ /**
+ * Splits out the equi-join (and optionally, a single non-equi) components
+ * of a join condition, and returns what's left. Projection might be
+ * required by the caller to provide join keys that are not direct field
+ * references.
+ *
+ * @param sysFieldList list of system fields
+ * @param inputs join inputs
+ * @param condition join condition
+ * @param joinKeys The join keys from the inputs which are equi-join
+ * keys
+ * @param filterNulls The join key positions for which null values will not
+ * match. null values only match for the "is not distinct
+ * from" condition.
+ * @param rangeOp if null, only locate equi-joins; otherwise, locate a
+ * single non-equi join predicate and return its operator
+ * in this list; join keys associated with the non-equi
+ * join predicate are at the end of the key lists
+ * returned
+ * @return What's left, never null
+ */
+ public static RexNode splitJoinCondition(
+ List<RelDataTypeField> sysFieldList,
+ List<RelNode> inputs,
+ RexNode condition,
+ List<List<RexNode>> joinKeys,
+ List<Integer> filterNulls,
+ List<SqlOperator> rangeOp) {
+ final List<RexNode> nonEquiList = new ArrayList<>();
splitJoinCondition(
sysFieldList,
- leftRel,
- rightRel,
+ inputs,
condition,
- leftJoinKeys,
- rightJoinKeys,
+ joinKeys,
filterNulls,
rangeOp,
nonEquiList);
// Convert the remainders into a list that are AND'ed together.
return RexUtil.composeConjunction(
- leftRel.getCluster().getRexBuilder(), nonEquiList, false);
+ inputs.get(0).getCluster().getRexBuilder(), nonEquiList, false);
}
public static RexNode splitCorrelatedFilterCondition(
LogicalFilter filter,
List<RexInputRef> joinKeys,
List<RexNode> correlatedJoinKeys) {
- List<RexNode> nonEquiList = new ArrayList<RexNode>();
+ final List<RexNode> nonEquiList = new ArrayList<>();
splitCorrelatedFilterCondition(
filter,
@@ -905,7 +939,7 @@ public abstract class RelOptUtil {
List<RexNode> joinKeys,
List<RexNode> correlatedJoinKeys,
boolean extractCorrelatedFieldAccess) {
- List<RexNode> nonEquiList = new ArrayList<RexNode>();
+ final List<RexNode> nonEquiList = new ArrayList<>();
splitCorrelatedFilterCondition(
filter,
@@ -922,36 +956,33 @@ public abstract class RelOptUtil {
private static void splitJoinCondition(
List<RelDataTypeField> sysFieldList,
- RelNode leftRel,
- RelNode rightRel,
+ List<RelNode> inputs,
RexNode condition,
- List<RexNode> leftJoinKeys,
- List<RexNode> rightJoinKeys,
+ List<List<RexNode>> joinKeys,
List<Integer> filterNulls,
List<SqlOperator> rangeOp,
List<RexNode> nonEquiList) {
final int sysFieldCount = sysFieldList.size();
- final int leftFieldCount = leftRel.getRowType().getFieldCount();
- final int rightFieldCount = rightRel.getRowType().getFieldCount();
- final int firstLeftField = sysFieldCount;
- final int firstRightField = sysFieldCount + leftFieldCount;
- final int totalFieldCount = firstRightField + rightFieldCount;
+ final RelOptCluster cluster = inputs.get(0).getCluster();
+ final RexBuilder rexBuilder = cluster.getRexBuilder();
+ final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
- final List<RelDataTypeField> leftFields =
- leftRel.getRowType().getFieldList();
- final List<RelDataTypeField> rightFields =
- rightRel.getRowType().getFieldList();
-
- RexBuilder rexBuilder = leftRel.getCluster().getRexBuilder();
- RelDataTypeFactory typeFactory = leftRel.getCluster().getTypeFactory();
+ int[] firstFieldInputs = new int[inputs.size()];
+ int totalFieldCount = 0;
+ for (int i = 0; i < inputs.size(); i++) {
+ firstFieldInputs[i] = totalFieldCount + sysFieldCount;
+ totalFieldCount += sysFieldCount
+ + inputs.get(i).getRowType().getFieldCount();
+ }
// adjustment array
int[] adjustments = new int[totalFieldCount];
- for (int i = firstLeftField; i < firstRightField; i++) {
- adjustments[i] = -firstLeftField;
- }
- for (int i = firstRightField; i < totalFieldCount; i++) {
- adjustments[i] = -firstRightField;
+ for (int i = 0; i < inputs.size(); i++) {
+ int limit = i == inputs.size() - 1
+ ? totalFieldCount : firstFieldInputs[i + 1];
+ for (int j = firstFieldInputs[i]; j < limit; j++) {
+ adjustments[j] = -firstFieldInputs[i];
+ }
}
if (condition instanceof RexCall) {
@@ -960,11 +991,9 @@ public abstract class RelOptUtil {
for (RexNode operand : call.getOperands()) {
splitJoinCondition(
sysFieldList,
- leftRel,
- rightRel,
+ inputs,
operand,
- leftJoinKeys,
- rightJoinKeys,
+ joinKeys,
filterNulls,
rangeOp,
nonEquiList);
@@ -974,6 +1003,10 @@ public abstract class RelOptUtil {
RexNode leftKey = null;
RexNode rightKey = null;
+ int leftInput = 0;
+ int rightInput = 0;
+ List<RelDataTypeField> leftFields = null;
+ List<RelDataTypeField> rightFields = null;
boolean reverse = false;
SqlKind kind = call.getKind();
@@ -995,18 +1028,37 @@ public abstract class RelOptUtil {
final ImmutableBitSet projRefs0 = InputFinder.bits(op0);
final ImmutableBitSet projRefs1 = InputFinder.bits(op1);
- if ((projRefs0.nextSetBit(firstRightField) < 0)
- && (projRefs1.nextSetBit(firstLeftField)
- >= firstRightField)) {
- leftKey = op0;
- rightKey = op1;
- } else if (
- (projRefs1.nextSetBit(firstRightField) < 0)
- && (projRefs0.nextSetBit(firstLeftField)
- >= firstRightField)) {
- leftKey = op1;
- rightKey = op0;
- reverse = true;
+ boolean foundBothInputs = false;
+ for (int i = 0; i < inputs.size() && !foundBothInputs; i++) {
+ final int lowerLimit = firstFieldInputs[i];
+ final int upperLimit = i == inputs.size() - 1
+ ? totalFieldCount : firstFieldInputs[i + 1];
+ if (projRefs0.nextSetBit(lowerLimit) < upperLimit
+ && projRefs0.nextSetBit(lowerLimit) != -1) {
+ if (leftKey == null) {
+ leftKey = op0;
+ leftInput = i;
+ leftFields = inputs.get(leftInput).getRowType().getFieldList();
+ } else {
+ rightKey = op0;
+ rightInput = i;
+ rightFields = inputs.get(rightInput).getRowType().getFieldList();
+ reverse = true;
+ foundBothInputs = true;
+ }
+ } else if (projRefs1.nextSetBit(lowerLimit) < upperLimit
+ && projRefs1.nextSetBit(lowerLimit) != -1) {
+ if (leftKey == null) {
+ leftKey = op1;
+ leftInput = i;
+ leftFields = inputs.get(leftInput).getRowType().getFieldList();
+ } else {
+ rightKey = op1;
+ rightInput = i;
+ rightFields = inputs.get(rightInput).getRowType().getFieldList();
+ foundBothInputs = true;
+ }
+ }
}
if ((leftKey != null) && (rightKey != null)) {
@@ -1070,33 +1122,29 @@ public abstract class RelOptUtil {
leftKey = null;
rightKey = null;
- if (projRefs.nextSetBit(firstRightField) < 0) {
- leftKey = condition.accept(
- new RelOptUtil.RexInputConverter(
- rexBuilder,
- leftFields,
- leftFields,
- adjustments));
+ boolean foundInput = false;
+ for (int i = 0; i < inputs.size() && !foundInput; i++) {
+ final int lowerLimit = firstFieldInputs[i];
+ final int upperLimit = i == inputs.size() - 1
+ ? totalFieldCount : firstFieldInputs[i + 1];
+ if (projRefs.nextSetBit(lowerLimit) < upperLimit) {
+ leftInput = i;
+ leftFields = inputs.get(leftInput).getRowType().getFieldList();
- rightKey = rexBuilder.makeLiteral(true);
+ leftKey = condition.accept(
+ new RelOptUtil.RexInputConverter(
+ rexBuilder,
+ leftFields,
+ leftFields,
+ adjustments));
- // effectively performing an equality comparison
- kind = SqlKind.EQUALS;
- } else if (projRefs.nextSetBit(firstLeftField)
- >= firstRightField) {
- leftKey = rexBuilder.makeLiteral(true);
+ rightKey = rexBuilder.makeLiteral(true);
- // replace right Key input ref
- rightKey =
- condition.accept(
- new RelOptUtil.RexInputConverter(
- rexBuilder,
- rightFields,
- rightFields,
- adjustments));
+ // effectively performing an equality comparison
+ kind = SqlKind.EQUALS;
- // effectively performing an equality comparison
- kind = SqlKind.EQUALS;
+ foundInput = true;
+ }
}
}
@@ -1106,18 +1154,18 @@ public abstract class RelOptUtil {
// non-equi join predicate, it appears at the end of the
// key list; also mark the null filtering property
addJoinKey(
- leftJoinKeys,
+ joinKeys.get(leftInput),
leftKey,
(rangeOp != null) && !rangeOp.isEmpty());
addJoinKey(
- rightJoinKeys,
+ joinKeys.get(rightInput),
rightKey,
(rangeOp != null) && !rangeOp.isEmpty());
if (filterNulls != null
&& kind == SqlKind.EQUALS) {
// nulls are considered not matching for equality comparison
// add the position of the most recently inserted key
- filterNulls.add(leftJoinKeys.size() - 1);
+ filterNulls.add(joinKeys.get(leftInput).size() - 1);
}
if (rangeOp != null
&& kind != SqlKind.EQUALS
@@ -1420,11 +1468,11 @@ public abstract class RelOptUtil {
int origLeftInputSize = leftRel.getRowType().getFieldCount();
int origRightInputSize = rightRel.getRowType().getFieldCount();
- List<RexNode> newLeftFields = new ArrayList<RexNode>();
- List<String> newLeftFieldNames = new ArrayList<String>();
+ final List<RexNode> newLeftFields = new ArrayList<>();
+ final List<String> newLeftFieldNames = new ArrayList<>();
- List<RexNode> newRightFields = new ArrayList<RexNode>();
- List<String> newRightFieldNames = new ArrayList<String>();
+ final List<RexNode> newRightFields = new ArrayList<>();
+ final List<String> newRightFieldNames = new ArrayList<>();
int leftKeyCount = leftJoinKeys.size();
int rightKeyCount = rightJoinKeys.size();
int i;
@@ -1519,15 +1567,13 @@ public abstract class RelOptUtil {
// join, then no need to create a projection
if ((newProjectOutputSize > 0)
&& (newProjectOutputSize < joinOutputFields.size())) {
- List<Pair<RexNode, String>> newProjects =
- new ArrayList<Pair<RexNode, String>>();
+ final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
RexBuilder rexBuilder = joinRel.getCluster().getRexBuilder();
for (int fieldIndex : outputProj) {
final RelDataTypeField field = joinOutputFields.get(fieldIndex);
newProjects.add(
- Pair.of(
- (RexNode) rexBuilder.makeInputRef(
- field.getType(), fieldIndex),
+ Pair.<RexNode, String>of(
+ rexBuilder.makeInputRef(field.getType(), fieldIndex),
field.getName()));
}
@@ -1945,7 +1991,7 @@ public abstract class RelOptUtil {
* {@code conjunctions(FALSE)} returns list {@code {FALSE}}.</p>
*/
public static List<RexNode> conjunctions(RexNode rexPredicate) {
- final List<RexNode> list = new ArrayList<RexNode>();
+ final List<RexNode> list = new ArrayList<>();
decomposeConjunction(rexPredicate, list);
return list;
}
@@ -1956,7 +2002,7 @@ public abstract class RelOptUtil {
* <p>For example, {@code disjunctions(FALSE)} returns the empty list.</p>
*/
public static List<RexNode> disjunctions(RexNode rexPredicate) {
- final List<RexNode> list = new ArrayList<RexNode>();
+ final List<RexNode> list = new ArrayList<>();
decomposeDisjunction(rexPredicate, list);
return list;
}
@@ -2007,7 +2053,7 @@ public abstract class RelOptUtil {
if (adjustment == 0) {
return keys;
}
- List<Integer> newKeys = new ArrayList<Integer>();
+ final List<Integer> newKeys = new ArrayList<>();
for (int key : keys) {
newKeys.add(key + adjustment);
}
@@ -2295,7 +2341,7 @@ public abstract class RelOptUtil {
final List<RelDataTypeField> newJoinFields =
newJoin.getRowType().getFieldList();
final RexBuilder rexBuilder = newJoin.getCluster().getRexBuilder();
- final List<RexNode> exps = new ArrayList<RexNode>();
+ final List<RexNode> exps = new ArrayList<>();
final int nFields =
origOrder ? origJoin.getRight().getRowType().getFieldCount()
: origJoin.getLeft().getRowType().getFieldCount();
@@ -2421,7 +2467,7 @@ public abstract class RelOptUtil {
*/
public static RelNode replaceInput(
RelNode parent, int ordinal, RelNode newInput) {
- final List<RelNode> inputs = new ArrayList<RelNode>(parent.getInputs());
+ final List<RelNode> inputs = new ArrayList<>(parent.getInputs());
if (inputs.get(ordinal) == newInput) {
return parent;
}
@@ -2485,7 +2531,7 @@ public abstract class RelOptUtil {
}
final List<RelNode> inputs = query.getInputs();
if (!inputs.isEmpty()) {
- final List<RelNode> newInputs = new ArrayList<RelNode>();
+ final List<RelNode> newInputs = new ArrayList<>();
for (RelNode input : inputs) {
newInputs.add(replaceRecurse(input, find, replace));
}
@@ -2708,11 +2754,12 @@ public abstract class RelOptUtil {
return permute(rel, permutation2, null);
}
}
- final List<RelDataType> outputTypeList = new ArrayList<RelDataType>();
- final List<String> outputNameList = new ArrayList<String>();
- final List<RexNode> exprList = new ArrayList<RexNode>();
- final List<RexLocalRef> projectRefList = new ArrayList<RexLocalRef>();
+ final List<RelDataType> outputTypeList = new ArrayList<>();
+ final List<String> outputNameList = new ArrayList<>();
+ final List<RexNode> exprList = new ArrayList<>();
+ final List<RexLocalRef> projectRefList = new ArrayList<>();
final List<RelDataTypeField> fields = rel.getRowType().getFieldList();
+ final RelOptCluster cluster = rel.getCluster();
for (int i = 0; i < permutation.getTargetCount(); i++) {
int target = permutation.getTarget(i);
final RelDataTypeField targetField = fields.get(target);
@@ -2723,24 +2770,21 @@ public abstract class RelOptUtil {
|| (fieldNames.get(i) == null)) ? targetField.getName()
: fieldNames.get(i));
exprList.add(
- rel.getCluster().getRexBuilder().makeInputRef(
- fields.get(i).getType(),
- i));
+ cluster.getRexBuilder().makeInputRef(fields.get(i).getType(), i));
final int source = permutation.getSource(i);
projectRefList.add(
new RexLocalRef(
source,
fields.get(source).getType()));
}
+ final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
final RexProgram program =
new RexProgram(
rel.getRowType(),
exprList,
projectRefList,
null,
- rel.getCluster().getTypeFactory().createStructType(
- outputTypeList,
- outputNameList));
+ typeFactory.createStructType(outputTypeList, outputNameList));
return LogicalCalc.create(rel, program);
}
@@ -2884,7 +2928,7 @@ public abstract class RelOptUtil {
/** Visitor that finds all variables used but not stopped in an expression. */
private static class VariableSetVisitor extends RelVisitor {
- final Set<String> variables = new HashSet<String>();
+ final Set<String> variables = new HashSet<>();
// implement RelVisitor
public void visit(
@@ -2902,7 +2946,7 @@ public abstract class RelOptUtil {
/** Visitor that finds all variables used in an expression. */
public static class VariableUsedVisitor extends RexShuttle {
- public final Set<String> variables = new LinkedHashSet<String>();
+ public final Set<String> variables = new LinkedHashSet<>();
public RexNode visitCorrelVariable(RexCorrelVariable p) {
variables.add(p.getName());
@@ -2912,8 +2956,7 @@ public abstract class RelOptUtil {
/** Shuttle that finds the set of inputs that are used. */
public static class InputReferencedVisitor extends RexShuttle {
- public final SortedSet<Integer> inputPosReferenced =
- new TreeSet<Integer>();
+ public final SortedSet<Integer> inputPosReferenced = new TreeSet<>();
public RexNode visitInputRef(RexInputRef inputRef) {
inputPosReferenced.add(inputRef.getIndex());
[10/10] incubator-calcite git commit: [CALCITE-650] Add metadata for
average size of a tuple in SemiJoin (Jesus Camacho Rodriguez)
Posted by jh...@apache.org.
[CALCITE-650] Add metadata for average size of a tuple in SemiJoin (Jesus Camacho Rodriguez)
Close apache/incubator-calcite#67
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/e2833a29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/e2833a29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/e2833a29
Branch: refs/heads/master
Commit: e2833a297cf1308fe63795ebabb52a2d66e3b863
Parents: 0f824f8
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Fri Mar 27 19:16:13 2015 +0000
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:14:19 2015 -0700
----------------------------------------------------------------------
.../org/apache/calcite/rel/metadata/RelMdSize.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/e2833a29/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
index 1a42f01..27e8a0c 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdSize.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Intersect;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Union;
@@ -172,13 +173,21 @@ public class RelMdSize {
return list.build();
}
+ public List<Double> averageColumnSizes(SemiJoin rel) {
+ return averageJoinColumnSizes(rel, true);
+ }
+
public List<Double> averageColumnSizes(Join rel) {
+ return averageJoinColumnSizes(rel, false);
+ }
+
+ private List<Double> averageJoinColumnSizes(Join rel, boolean semijoin) {
final RelNode left = rel.getLeft();
final RelNode right = rel.getRight();
final List<Double> lefts =
RelMetadataQuery.getAverageColumnSizes(left);
- final List<Double> rights =
- RelMetadataQuery.getAverageColumnSizes(right);
+ final List<Double> rights = semijoin
+ ? null : RelMetadataQuery.getAverageColumnSizes(right);
if (lefts == null && rights == null) {
return null;
}
[09/10] incubator-calcite git commit: [CALCITE-637] Implement Avatica
CloseConnection RPC (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-637] Implement Avatica CloseConnection RPC (Nick Dimiduk)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/18fea1f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/18fea1f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/18fea1f0
Branch: refs/heads/master
Commit: 18fea1f0f42a3bb68bb1053e37b1c4ee08769319
Parents: 0ad6019
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 16:04:28 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700
----------------------------------------------------------------------
avatica/pom.xml | 4 ++
.../calcite/avatica/AvaticaConnection.java | 1 +
.../java/org/apache/calcite/avatica/Meta.java | 3 ++
.../org/apache/calcite/avatica/MetaImpl.java | 13 +++++
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 56 ++++++++++++++++++--
.../calcite/avatica/remote/JsonService.java | 9 ++++
.../calcite/avatica/remote/LocalService.java | 6 +++
.../calcite/avatica/remote/MockJsonService.java | 16 +++++-
.../calcite/avatica/remote/RemoteMeta.java | 5 ++
.../apache/calcite/avatica/remote/Service.java | 34 ++++++++++--
.../calcite/avatica/test/RemoteDriverTest.java | 4 +-
pom.xml | 5 ++
12 files changed, 145 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 3e6040c..0e58323 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index edc2887..f05907f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -154,6 +154,7 @@ public abstract class AvaticaConnection implements Connection {
// Per specification, if onConnectionClose throws, this method will throw
// a SQLException, but statement will still be closed.
try {
+ meta.closeConnection(handle);
driver.handler.onConnectionClose(this);
} catch (RuntimeException e) {
throw helper.createException("While closing connection", e);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index 22ea681..ecd3ee3 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -200,6 +200,9 @@ public interface Meta {
*/
void closeStatement(StatementHandle h);
+ /** Close a connection */
+ void closeConnection(ConnectionHandle ch);
+
/** Factory to create instances of {@link Meta}. */
interface Factory {
Meta create(List<String> args);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index d6eca46..3ebff75 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -162,6 +162,19 @@ public abstract class MetaImpl implements Meta {
}
}
+ @Override public void closeConnection(ConnectionHandle ch) {
+ // TODO: implement
+ //
+ // lots of Calcite tests break with this simple implementation,
+ // requires investigation
+
+// try {
+// connection.close();
+// } catch (SQLException e) {
+// throw new RuntimeException(e);
+// }
+ }
+
public StatementHandle createStatement(ConnectionHandle ch) {
return new StatementHandle(ch.id, connection.statementCount++, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index f864614..24bdbda 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -20,6 +20,9 @@ import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import java.lang.reflect.Array;
import java.lang.reflect.Type;
import java.math.BigDecimal;
@@ -43,6 +46,9 @@ import java.util.UUID;
/** Implementation of {@link Meta} upon an existing JDBC data source. */
public class JdbcMeta implements Meta {
+
+ private static final Log LOG = LogFactory.getLog(JdbcMeta.class);
+
/**
* JDBC Types Mapped to Java Types
*
@@ -417,7 +423,11 @@ public class JdbcMeta implements Meta {
final Statement statement = conn.createStatement();
final int id = System.identityHashCode(statement);
statementMap.put(id, new StatementInfo(statement));
- return new StatementHandle(ch.id, id, null);
+ StatementHandle h = new StatementHandle(ch.id, id, null);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("created statement " + h);
+ }
+ return h;
} catch (SQLException e) {
throw propagate(e);
}
@@ -426,11 +436,17 @@ public class JdbcMeta implements Meta {
@Override public void closeStatement(StatementHandle h) {
Statement stmt = statementMap.get(h.id).statement;
if (stmt == null) {
+ LOG.debug("client requested close unknown statement " + h);
return;
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("closing statement " + h);
+ }
try {
- assert stmt.getConnection() == connectionMap.get(h.connectionId);
+ boolean isOwned =
+ stmt.getConnection() == connectionMap.get(h.connectionId);
stmt.close();
+ assert isOwned : "no connection found while closing " + h;
} catch (SQLException e) {
throw propagate(e);
} finally {
@@ -438,6 +454,24 @@ public class JdbcMeta implements Meta {
}
}
+ @Override public void closeConnection(ConnectionHandle ch) {
+ Connection conn = connectionMap.get(ch.id);
+ if (conn == null) {
+ LOG.debug("client requested close unknown connection " + ch);
+ return;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("closing connection " + ch);
+ }
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ throw propagate(e);
+ } finally {
+ connectionMap.remove(ch.id);
+ }
+ }
+
private RuntimeException propagate(Throwable e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
@@ -455,8 +489,12 @@ public class JdbcMeta implements Meta {
final PreparedStatement statement = conn.prepareStatement(sql);
final int id = System.identityHashCode(statement);
statementMap.put(id, new StatementInfo(statement));
- return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
- statement.getParameterMetaData(), sql));
+ StatementHandle h = new StatementHandle(ch.id, id, signature(
+ statement.getMetaData(), statement.getParameterMetaData(), sql));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("prepared statement " + h);
+ }
+ return h;
} catch (SQLException e) {
throw propagate(e);
}
@@ -471,7 +509,12 @@ public class JdbcMeta implements Meta {
final StatementInfo info = new StatementInfo(statement);
statementMap.put(id, info);
info.resultSet = statement.executeQuery();
- return JdbcResultSet.create(ch.id, id, info.resultSet);
+ MetaResultSet mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
+ if (LOG.isTraceEnabled()) {
+ StatementHandle h = new StatementHandle(ch.id, id, null);
+ LOG.trace("prepAndExec statement " + h);
+ }
+ return mrs;
} catch (SQLException e) {
throw propagate(e);
}
@@ -479,6 +522,9 @@ public class JdbcMeta implements Meta {
public Frame fetch(StatementHandle h, List<Object> parameterValues,
int offset, int fetchMaxRowCount) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
+ }
final StatementInfo statementInfo = statementMap.get(h.id);
try {
assert statementInfo.statement.getConnection()
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 8ba08cf..dc4268b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -138,6 +138,15 @@ public abstract class JsonService implements Service {
throw handle(e);
}
}
+
+ @Override
+ public CloseConnectionResponse apply(CloseConnectionRequest request) {
+ try {
+ return decode(apply(encode(request)), CloseConnectionResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
}
// End JsonService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 89717b6..719ef1d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -196,6 +196,12 @@ public class LocalService implements Service {
request.connectionId, request.statementId, null));
return new CloseStatementResponse();
}
+
+ @Override
+ public CloseConnectionResponse apply(CloseConnectionRequest request) {
+ meta.closeConnection(new Meta.ConnectionHandle(request.connectionId));
+ return new CloseConnectionResponse();
+ }
}
// End LocalService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
index 19e59e3..02cb191 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -36,13 +36,27 @@ public class MockJsonService extends JsonService {
}
@Override public String apply(String request) {
- final String response = map.get(request);
+ String response = map.get(request);
+ if (response == null) {
+ response = handleCloseConnection(request);
+ }
if (response == null) {
throw new RuntimeException("No response for " + request);
}
return response;
}
+ /**
+ * Special case for closeConnection because connection IDs are random.
+ * @return response if is a CloseConnectionRequest, null otherwise.
+ */
+ private static String handleCloseConnection(String request) {
+ if (request.contains("closeConnection")) {
+ return "{\"response\":\"closeConnection\"}";
+ }
+ return null;
+ }
+
/** Factory that creates a {@code MockJsonService}. */
public static class Factory implements Service.Factory {
public Service create(AvaticaConnection connection) {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 0a5a3a0..e72ad8b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -64,6 +64,11 @@ class RemoteMeta extends MetaImpl {
service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
}
+ @Override public void closeConnection(ConnectionHandle ch) {
+ final Service.CloseConnectionResponse response =
+ service.apply(new Service.CloseConnectionRequest(ch.id));
+ }
+
@Override public MetaResultSet getCatalogs() {
final Service.ResultSetResponse response =
service.apply(new Service.CatalogsRequest());
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index ea03348..c56524d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -40,6 +40,7 @@ public interface Service {
FetchResponse apply(FetchRequest request);
CreateStatementResponse apply(CreateStatementRequest request);
CloseStatementResponse apply(CloseStatementRequest request);
+ CloseConnectionResponse apply(CloseConnectionRequest request);
/** Factory that creates a {@code Service}. */
interface Factory {
@@ -65,7 +66,9 @@ public interface Service {
@JsonSubTypes.Type(value = CreateStatementRequest.class,
name = "createStatement"),
@JsonSubTypes.Type(value = CloseStatementRequest.class,
- name = "closeStatement") })
+ name = "closeStatement"),
+ @JsonSubTypes.Type(value = CloseConnectionRequest.class,
+ name = "closeConnection") })
abstract class Request {
abstract Response accept(Service service);
}
@@ -82,7 +85,9 @@ public interface Service {
@JsonSubTypes.Type(value = CreateStatementResponse.class,
name = "createStatement"),
@JsonSubTypes.Type(value = CloseStatementResponse.class,
- name = "closeStatement") })
+ name = "closeStatement"),
+ @JsonSubTypes.Type(value = CloseConnectionResponse.class,
+ name = "closeConnection") })
abstract class Response {
}
@@ -202,7 +207,7 @@ public interface Service {
}
/** Request for
- * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
+ * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
class PrepareAndExecuteRequest extends Request {
public final String connectionId;
public final String sql;
@@ -356,6 +361,29 @@ public interface Service {
@JsonCreator
public CloseStatementResponse() {}
}
+
+ /** Request for
+ * {@link Meta#closeConnection(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
+ class CloseConnectionRequest extends Request {
+ public final String connectionId;
+
+ @JsonCreator
+ public CloseConnectionRequest(
+ @JsonProperty("connectionId") String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ @Override CloseConnectionResponse accept(Service service) {
+ return service.apply(this);
+ }
+ }
+
+ /** Response from
+ * {@link org.apache.calcite.avatica.remote.Service.CloseConnectionRequest}. */
+ class CloseConnectionResponse extends Response {
+ @JsonCreator
+ public CloseConnectionResponse() {}
+ }
}
// End Service.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 6713632..db4b42a 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -236,8 +236,8 @@ public class RemoteDriverTest {
s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
conn2.close();
conn1.close();
-// assertEquals("closing a connection closes the server-side connection",
-// 1, connectionMap.size());
+ assertEquals("closing a connection closes the server-side connection",
+ 1, connectionMap.size());
}
private void checkStatementExecuteQuery(Connection connection)
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f870774..7d26118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,6 +191,11 @@ limitations under the License.
<version>3.2</version>
</dependency>
<dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.3</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.6</version>
[08/10] incubator-calcite git commit: [CALCITE-636] Connection
isolation for Avatica clients (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-636] Connection isolation for Avatica clients (Nick Dimiduk)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ad6019b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ad6019b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ad6019b
Branch: refs/heads/master
Commit: 0ad6019bc9efa9e6ecc36647d27e258aad3f4eaa
Parents: 208eda6
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:24:45 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700
----------------------------------------------------------------------
.../calcite/avatica/AvaticaConnection.java | 24 ++--
.../calcite/avatica/AvaticaStatement.java | 8 +-
.../java/org/apache/calcite/avatica/Meta.java | 36 ++++--
.../org/apache/calcite/avatica/MetaImpl.java | 5 +-
.../calcite/avatica/UnregisteredDriver.java | 2 +-
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 120 ++++++++++++++-----
.../calcite/avatica/jdbc/JdbcResultSet.java | 13 +-
.../calcite/avatica/remote/LocalService.java | 44 ++++---
.../calcite/avatica/remote/RemoteMeta.java | 27 +++--
.../apache/calcite/avatica/remote/Service.java | 54 ++++++---
.../calcite/avatica/test/RemoteDriverTest.java | 101 +++++++++++++---
.../apache/calcite/jdbc/CalciteMetaImpl.java | 17 ++-
12 files changed, 314 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 293cc6c..edc2887 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TimeZone;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -57,7 +58,8 @@ public abstract class AvaticaConnection implements Connection {
private int networkTimeout;
private String catalog;
- public final int id;
+ public final String id;
+ public final Meta.ConnectionHandle handle;
protected final UnregisteredDriver driver;
protected final AvaticaFactory factory;
final String url;
@@ -70,8 +72,6 @@ public abstract class AvaticaConnection implements Connection {
public final Map<Integer, AvaticaStatement> statementMap =
new ConcurrentHashMap<>();
- private static int nextId;
-
/**
* Creates an AvaticaConnection.
*
@@ -87,7 +87,8 @@ public abstract class AvaticaConnection implements Connection {
AvaticaFactory factory,
String url,
Properties info) {
- this.id = nextId++;
+ this.id = UUID.randomUUID().toString();
+ this.handle = new Meta.ConnectionHandle(this.id);
this.driver = driver;
this.factory = factory;
this.url = url;
@@ -272,12 +273,9 @@ public abstract class AvaticaConnection implements Connection {
int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
try {
- // TODO: cut out round-trip to create a statement handle
final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(id);
- final Meta.StatementHandle h = meta.createStatement(ch);
-
- final Meta.Signature x = meta.prepare(h, sql, -1);
- return factory.newPreparedStatement(this, h, x, resultSetType,
+ final Meta.StatementHandle h = meta.prepare(ch, sql, -1);
+ return factory.newPreparedStatement(this, h, h.signature, resultSetType,
resultSetConcurrency, resultSetHoldability);
} catch (RuntimeException e) {
throw helper.createException("while preparing SQL: " + sql, e);
@@ -440,8 +438,8 @@ public abstract class AvaticaConnection implements Connection {
protected ResultSet prepareAndExecuteInternal(
final AvaticaStatement statement, String sql, int maxRowCount)
throws SQLException {
- Meta.MetaResultSet x = meta.prepareAndExecute(statement.handle, sql,
- maxRowCount, new Meta.PrepareCallback() {
+ Meta.MetaResultSet x = meta.prepareAndExecute(handle, sql, maxRowCount,
+ new Meta.PrepareCallback() {
public Object getMonitor() {
return statement;
}
@@ -477,8 +475,8 @@ public abstract class AvaticaConnection implements Connection {
protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
throws SQLException {
- final Meta.StatementHandle h =
- new Meta.StatementHandle(metaResultSet.statementId);
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ metaResultSet.connectionId, metaResultSet.statementId, null);
final AvaticaStatement statement = lookupStatement(h);
return executeQueryInternal(statement, metaResultSet.signature.sanitize(),
metaResultSet.firstFrame);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 2ad74bf..8276b07 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -92,8 +92,12 @@ public abstract class AvaticaStatement
try {
// In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0
final int maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount;
- Meta.Signature x = connection.meta.prepare(handle, sql, maxRowCount1);
- return executeInternal(x);
+ ResultSet resultSet =
+ connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+ if (resultSet.isClosed()) {
+ return false;
+ }
+ return true;
} catch (RuntimeException e) {
throw connection.helper.createException("while executing SQL: " + sql, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index cd0166c..22ea681 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -153,22 +153,22 @@ public interface Meta {
/** Prepares a statement.
*
- * @param h Statement handle
+ * @param ch Connection handle
* @param sql SQL query
* @param maxRowCount Negative for no limit (different meaning than JDBC)
* @return Signature of prepared statement
*/
- Signature prepare(StatementHandle h, String sql, int maxRowCount);
+ StatementHandle prepare(ConnectionHandle ch, String sql, int maxRowCount);
/** Prepares and executes a statement.
*
- * @param h Statement handle
+ * @param ch Connection handle
* @param sql SQL query
* @param maxRowCount Negative for no limit (different meaning than JDBC)
* @param callback Callback to lock, clear and assign cursor
- * @return Signature of prepared statement
+ * @return MetaResultSet containing statement ID and first frame of data
*/
- MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+ MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
int maxRowCount, PrepareCallback callback);
/** Returns a frame of rows.
@@ -223,14 +223,16 @@ public interface Meta {
/** Meta data from which a result set can be constructed. */
class MetaResultSet {
+ public final String connectionId;
public final int statementId;
public final boolean ownStatement;
public final Frame firstFrame;
public final Signature signature;
- public MetaResultSet(int statementId, boolean ownStatement,
- Signature signature, Frame firstFrame) {
+ public MetaResultSet(String connectionId, int statementId,
+ boolean ownStatement, Signature signature, Frame firstFrame) {
this.signature = Objects.requireNonNull(signature);
+ this.connectionId = connectionId;
this.statementId = statementId;
this.ownStatement = ownStatement;
this.firstFrame = firstFrame; // may be null
@@ -430,29 +432,39 @@ public interface Meta {
/** Connection handle. */
class ConnectionHandle {
- public final int id;
+ public final String id;
@Override public String toString() {
- return Integer.toString(id);
+ return id;
}
@JsonCreator
- public ConnectionHandle(@JsonProperty("id") int id) {
+ public ConnectionHandle(@JsonProperty("id") String id) {
this.id = id;
}
}
/** Statement handle. */
class StatementHandle {
+ public final String connectionId;
public final int id;
+ // not final because LocalService#apply(PrepareRequest)
+ /** Only present for PreparedStatement handles, null otherwise. */
+ public Signature signature;
+
@Override public String toString() {
- return Integer.toString(id);
+ return connectionId + "::" + Integer.toString(id);
}
@JsonCreator
- public StatementHandle(@JsonProperty("id") int id) {
+ public StatementHandle(
+ @JsonProperty("connectionId") String connectionId,
+ @JsonProperty("id") int id,
+ @JsonProperty("signature") Signature signature) {
+ this.connectionId = connectionId;
this.id = id;
+ this.signature = signature;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index a6c5f3c..d6eca46 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -163,7 +163,7 @@ public abstract class MetaImpl implements Meta {
}
public StatementHandle createStatement(ConnectionHandle ch) {
- return new StatementHandle(connection.statementCount++);
+ return new StatementHandle(ch.id, connection.statementCount++, null);
}
/** Creates an empty result set. Useful for JDBC metadata methods that are
@@ -215,7 +215,8 @@ public abstract class MetaImpl implements Meta {
final Signature signature =
new Signature(columns, "", Collections.<AvaticaParameter>emptyList(),
internalParameters, cursorFactory);
- return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+ return new MetaResultSet(connection.id, statement.getId(), true,
+ signature, firstFrame);
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
index fefd5f4..5f8d492 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
@@ -42,7 +42,7 @@ import java.util.logging.Logger;
*
* <p>The provider must implement:</p>
* <ul>
- * <li>{@link Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}
+ * <li>{@link Meta#prepare(Meta.ConnectionHandle, String, int)}
* <li>{@link Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)}
* </ul>
*/
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 72c3948..f864614 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Array;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -37,6 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
/** Implementation of {@link Meta} upon an existing JDBC data source. */
public class JdbcMeta implements Meta {
@@ -72,6 +75,13 @@ public class JdbcMeta implements Meta {
SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
}
+ private static final String DEFAULT_CONN_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000000").toString();
+
+ private final String url;
+ private final Properties info;
+ private final Connection connection; // TODO: remove default connection
+ private final Map<String, Connection> connectionMap = new HashMap<>();
private final Map<Integer, StatementInfo> statementMap = new HashMap<>();
/**
@@ -124,7 +134,7 @@ public class JdbcMeta implements Meta {
protected static Signature signature(ResultSetMetaData metaData,
ParameterMetaData parameterMetaData, String sql) throws SQLException {
return new Signature(columns(metaData), sql, parameters(parameterMetaData),
- null, CursorFactory.ARRAY);
+ null, CursorFactory.LIST /* LIST because JdbcResultSet#frame */);
}
protected static Signature signature(ResultSetMetaData metaData)
@@ -132,10 +142,43 @@ public class JdbcMeta implements Meta {
return signature(metaData, null, null);
}
- protected final Connection connection;
+ /**
+ * @param url a database url of the form
+ * <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+ */
+ public JdbcMeta(String url) throws SQLException {
+ this(url, new Properties());
+ }
- public JdbcMeta(Connection connection) {
- this.connection = connection;
+ /**
+ * @param url a database url of the form
+ * <code>jdbc:<em>subprotocol</em>:<em>subname</em></code>
+ * @param user the database user on whose behalf the connection is being
+ * made
+ * @param password the user's password
+ */
+ public JdbcMeta(final String url, final String user, final String password)
+ throws SQLException {
+ this(url, new Properties() {
+ {
+ put("user", user);
+ put("password", password);
+ }
+ });
+ }
+
+ /**
+ * @param url a database url of the form
+ * <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+ * @param info a list of arbitrary string tag/value pairs as
+ * connection arguments; normally at least a "user" and
+ * "password" property should be included
+ */
+ public JdbcMeta(String url, Properties info) throws SQLException {
+ this.url = url;
+ this.info = info;
+ this.connection = DriverManager.getConnection(url, info);
+ this.connectionMap.put(DEFAULT_CONN_ID, connection);
}
public String getSqlKeywords() {
@@ -170,7 +213,7 @@ public class JdbcMeta implements Meta {
Pat tableNamePattern, List<String> typeList) {
try {
String[] types = new String[typeList == null ? 0 : typeList.size()];
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getTables(catalog, schemaPattern.s,
tableNamePattern.s,
typeList == null ? types : typeList.toArray(types)));
@@ -182,7 +225,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getColumns(String catalog, Pat schemaPattern,
Pat tableNamePattern, Pat columnNamePattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getColumns(catalog, schemaPattern.s,
tableNamePattern.s, columnNamePattern.s));
} catch (SQLException e) {
@@ -192,7 +235,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getSchemas(String catalog, Pat schemaPattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getSchemas(catalog, schemaPattern.s));
} catch (SQLException e) {
throw new RuntimeException(e);
@@ -201,7 +244,8 @@ public class JdbcMeta implements Meta {
public MetaResultSet getCatalogs() {
try {
- return JdbcResultSet.create(connection.getMetaData().getCatalogs());
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+ connection.getMetaData().getCatalogs());
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -209,7 +253,8 @@ public class JdbcMeta implements Meta {
public MetaResultSet getTableTypes() {
try {
- return JdbcResultSet.create(connection.getMetaData().getTableTypes());
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+ connection.getMetaData().getTableTypes());
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -218,7 +263,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getProcedures(String catalog, Pat schemaPattern,
Pat procedureNamePattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getProcedures(catalog, schemaPattern.s,
procedureNamePattern.s));
} catch (SQLException e) {
@@ -229,7 +274,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getProcedureColumns(String catalog, Pat schemaPattern,
Pat procedureNamePattern, Pat columnNamePattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getProcedureColumns(catalog,
schemaPattern.s, procedureNamePattern.s, columnNamePattern.s));
} catch (SQLException e) {
@@ -240,7 +285,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getColumnPrivileges(String catalog, String schema,
String table, Pat columnNamePattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getColumnPrivileges(catalog, schema,
table, columnNamePattern.s));
} catch (SQLException e) {
@@ -251,7 +296,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getTablePrivileges(String catalog, Pat schemaPattern,
Pat tableNamePattern) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getTablePrivileges(catalog,
schemaPattern.s, tableNamePattern.s));
} catch (SQLException e) {
@@ -262,7 +307,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getBestRowIdentifier(String catalog, String schema,
String table, int scope, boolean nullable) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getBestRowIdentifier(catalog, schema,
table, scope, nullable));
} catch (SQLException e) {
@@ -273,7 +318,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getVersionColumns(String catalog, String schema,
String table) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getVersionColumns(catalog, schema, table));
} catch (SQLException e) {
throw new RuntimeException(e);
@@ -283,7 +328,7 @@ public class JdbcMeta implements Meta {
public MetaResultSet getPrimaryKeys(String catalog, String schema,
String table) {
try {
- return JdbcResultSet.create(
+ return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
connection.getMetaData().getPrimaryKeys(catalog, schema, table));
} catch (SQLException e) {
throw new RuntimeException(e);
@@ -359,12 +404,20 @@ public class JdbcMeta implements Meta {
return null;
}
+ protected Connection getConnection(String id) throws SQLException {
+ if (connectionMap.get(id) == null) {
+ connectionMap.put(id, DriverManager.getConnection(url, info));
+ }
+ return connectionMap.get(id);
+ }
+
public StatementHandle createStatement(ConnectionHandle ch) {
try {
- final Statement statement = connection.createStatement();
- final int id = statementMap.size();
+ final Connection conn = getConnection(ch.id);
+ final Statement statement = conn.createStatement();
+ final int id = System.identityHashCode(statement);
statementMap.put(id, new StatementInfo(statement));
- return new StatementHandle(id);
+ return new StatementHandle(ch.id, id, null);
} catch (SQLException e) {
throw propagate(e);
}
@@ -376,6 +429,7 @@ public class JdbcMeta implements Meta {
return;
}
try {
+ assert stmt.getConnection() == connectionMap.get(h.connectionId);
stmt.close();
} catch (SQLException e) {
throw propagate(e);
@@ -394,24 +448,30 @@ public class JdbcMeta implements Meta {
}
}
- public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
- // TODO: can't actually prepare an existing statement...
+ public StatementHandle prepare(ConnectionHandle ch, String sql,
+ int maxRowCount) {
try {
- PreparedStatement statement = connection.prepareStatement(sql);
- statementMap.put(h.id, new StatementInfo(statement));
- return signature(statement.getMetaData(),
- statement.getParameterMetaData(), sql);
+ final Connection conn = getConnection(ch.id);
+ final PreparedStatement statement = conn.prepareStatement(sql);
+ final int id = System.identityHashCode(statement);
+ statementMap.put(id, new StatementInfo(statement));
+ return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
+ statement.getParameterMetaData(), sql));
} catch (SQLException e) {
throw propagate(e);
}
}
- public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+ public MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
int maxRowCount, PrepareCallback callback) {
- final StatementInfo statementInfo = statementMap.get(h.id);
try {
- statementInfo.resultSet = statementInfo.statement.executeQuery(sql);
- return JdbcResultSet.create(statementInfo.resultSet);
+ final Connection connection = getConnection(ch.id);
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ final int id = System.identityHashCode(statement);
+ final StatementInfo info = new StatementInfo(statement);
+ statementMap.put(id, info);
+ info.resultSet = statement.executeQuery();
+ return JdbcResultSet.create(ch.id, id, info.resultSet);
} catch (SQLException e) {
throw propagate(e);
}
@@ -421,6 +481,8 @@ public class JdbcMeta implements Meta {
int offset, int fetchMaxRowCount) {
final StatementInfo statementInfo = statementMap.get(h.id);
try {
+ assert statementInfo.statement.getConnection()
+ == connectionMap.get(h.connectionId);
if (statementInfo.resultSet == null || parameterValues != null) {
if (statementInfo.resultSet != null) {
statementInfo.resultSet.close();
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index ac4aeb5..827f31d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -30,19 +30,20 @@ import java.util.List;
*
* @see org.apache.calcite.avatica.jdbc.JdbcMeta */
class JdbcResultSet extends Meta.MetaResultSet {
- protected JdbcResultSet(int statementId, boolean ownStatement,
- Meta.Signature signature, Meta.Frame firstFrame) {
- super(statementId, ownStatement, signature, firstFrame);
+ protected JdbcResultSet(String connectionId, int statementId,
+ boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame) {
+ super(connectionId, statementId, ownStatement, signature, firstFrame);
}
/** Creates a result set. */
- public static JdbcResultSet create(ResultSet resultSet) {
+ public static JdbcResultSet create(String connectionId, int statementId,
+ ResultSet resultSet) {
try {
- int id = resultSet.getStatement().hashCode();
Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData());
final Meta.Frame firstFrame = frame(resultSet, 0, -1);
resultSet.close();
- return new JdbcResultSet(id, true, sig, firstFrame);
+ return new JdbcResultSet(connectionId, statementId, true, sig,
+ firstFrame);
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index e302b85..89717b6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -59,6 +59,7 @@ public class LocalService implements Service {
cursorFactory = Meta.CursorFactory.LIST;
break;
case MAP:
+ case LIST:
break;
default:
cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
@@ -72,13 +73,13 @@ public class LocalService implements Service {
if (cursorFactory != resultSet.signature.cursorFactory) {
signature = signature.setCursorFactory(cursorFactory);
}
- return new ResultSetResponse(resultSet.statementId, resultSet.ownStatement,
- signature, new Meta.Frame(0, true, list));
+ return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
+ resultSet.ownStatement, signature, new Meta.Frame(0, true, list));
}
private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
- final Meta.StatementHandle h =
- new Meta.StatementHandle(resultSet.statementId);
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ resultSet.connectionId, resultSet.statementId, null);
final Iterable<Object> iterable = meta.createIterable(h,
resultSet.signature, Collections.emptyList(), resultSet.firstFrame);
final List<List<Object>> list = new ArrayList<>();
@@ -118,12 +119,12 @@ public class LocalService implements Service {
}
public PrepareResponse apply(PrepareRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
final Meta.StatementHandle h =
- new Meta.StatementHandle(request.statementId);
- Meta.Signature signature =
- meta.prepare(h, request.sql, request.maxRowCount)
- .setCursorFactory(Meta.CursorFactory.LIST);
+ meta.prepare(ch, request.sql, request.maxRowCount);
if (json) {
+ Meta.Signature signature = h.signature;
final List<ColumnMetaData> columns = new ArrayList<>();
for (ColumnMetaData column : signature.columns) {
switch (column.type.rep) {
@@ -146,33 +147,37 @@ public class LocalService implements Service {
signature = new Meta.Signature(columns, signature.sql,
signature.parameters, signature.internalParameters,
signature.cursorFactory);
+ h.signature = signature;
}
- return new PrepareResponse(signature);
+ return new PrepareResponse(h);
}
public ResultSetResponse apply(PrepareAndExecuteRequest request) {
- final Meta.StatementHandle h =
- new Meta.StatementHandle(request.statementId);
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet =
- meta.prepareAndExecute(h, request.sql, request.maxRowCount,
+ meta.prepareAndExecute(ch, request.sql, request.maxRowCount,
new Meta.PrepareCallback() {
@Override public Object getMonitor() {
return LocalService.class;
}
- @Override public void clear() {}
+ @Override public void clear() {
+ }
@Override public void assign(Meta.Signature signature,
- Meta.Frame firstFrame) {}
+ Meta.Frame firstFrame) {
+ }
- @Override public void execute() {}
+ @Override public void execute() {
+ }
});
return toResponse(resultSet);
}
public FetchResponse apply(FetchRequest request) {
- final Meta.StatementHandle h =
- new Meta.StatementHandle(request.statementId);
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ request.connectionId, request.statementId, null);
final Meta.Frame frame =
meta.fetch(h, request.parameterValues, request.offset,
request.fetchMaxRowCount);
@@ -182,12 +187,13 @@ public class LocalService implements Service {
public CreateStatementResponse apply(CreateStatementRequest request) {
final Meta.StatementHandle h =
meta.createStatement(new Meta.ConnectionHandle(request.connectionId));
- return new CreateStatementResponse(h.id);
+ return new CreateStatementResponse(h.connectionId, h.id);
}
@Override
public CloseStatementResponse apply(CloseStatementRequest request) {
- meta.closeStatement(new Meta.StatementHandle(request.statementId));
+ meta.closeStatement(new Meta.StatementHandle(
+ request.connectionId, request.statementId, null));
return new CloseStatementResponse();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e8bbfb5..0a5a3a0 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -48,19 +48,20 @@ class RemoteMeta extends MetaImpl {
signature0 = Signature.create(columns,
"?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY);
}
- return new MetaResultSet(response.statementId, response.ownStatement,
- signature0, response.firstFrame);
+ return new MetaResultSet(response.connectionId, response.statementId,
+ response.ownStatement, signature0, response.firstFrame);
}
@Override public StatementHandle createStatement(ConnectionHandle ch) {
final Service.CreateStatementResponse response =
service.apply(new Service.CreateStatementRequest(ch.id));
- return new StatementHandle(response.id);
+ return new StatementHandle(response.connectionId, response.statementId,
+ null);
}
@Override public void closeStatement(StatementHandle h) {
final Service.CloseStatementResponse response =
- service.apply(new Service.CloseStatementRequest(h.id));
+ service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
}
@Override public MetaResultSet getCatalogs() {
@@ -99,21 +100,21 @@ class RemoteMeta extends MetaImpl {
return toResultSet(MetaColumn.class, response);
}
- @Override public Signature prepare(StatementHandle h, String sql,
+ @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
int maxRowCount) {
- final Service.PrepareResponse response =
- service.apply(new Service.PrepareRequest(h.id, sql, maxRowCount));
- return response.signature;
+ final Service.PrepareResponse response = service.apply(
+ new Service.PrepareRequest(ch.id, sql, maxRowCount));
+ return response.statement;
}
- @Override public MetaResultSet prepareAndExecute(StatementHandle h,
+ @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
String sql, int maxRowCount, PrepareCallback callback) {
final Service.ResultSetResponse response;
try {
synchronized (callback.getMonitor()) {
callback.clear();
- response = service.apply(
- new Service.PrepareAndExecuteRequest(h.id, sql, maxRowCount));
+ response = service.apply(new Service.PrepareAndExecuteRequest(
+ ch.id, sql, maxRowCount));
callback.assign(response.signature, response.firstFrame);
}
callback.execute();
@@ -127,8 +128,8 @@ class RemoteMeta extends MetaImpl {
int offset, int fetchMaxRowCount) {
final Service.FetchResponse response =
service.apply(
- new Service.FetchRequest(h.id, parameterValues, offset,
- fetchMaxRowCount));
+ new Service.FetchRequest(h.connectionId, h.id, parameterValues,
+ offset, fetchMaxRowCount));
return response.frame;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index bbe3552..ea03348 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -180,16 +180,20 @@ public interface Service {
* {@link Meta#getTableTypes()}
* return this response. */
class ResultSetResponse extends Response {
+ public final String connectionId;
public final int statementId;
public final boolean ownStatement;
public final Meta.Signature signature;
public final Meta.Frame firstFrame;
@JsonCreator
- public ResultSetResponse(@JsonProperty("statementId") int statementId,
+ public ResultSetResponse(
+ @JsonProperty("connectionId") String connectionId,
+ @JsonProperty("statementId") int statementId,
@JsonProperty("ownStatement") boolean ownStatement,
@JsonProperty("signature") Meta.Signature signature,
@JsonProperty("firstFrame") Meta.Frame firstFrame) {
+ this.connectionId = connectionId;
this.statementId = statementId;
this.ownStatement = ownStatement;
this.signature = signature;
@@ -200,16 +204,16 @@ public interface Service {
/** Request for
* {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
class PrepareAndExecuteRequest extends Request {
- public final int statementId;
+ public final String connectionId;
public final String sql;
public final int maxRowCount;
@JsonCreator
public PrepareAndExecuteRequest(
- @JsonProperty("statementId") int statementId,
+ @JsonProperty("connectionId") String connectionId,
@JsonProperty("sql") String sql,
@JsonProperty("maxRowCount") int maxRowCount) {
- this.statementId = statementId;
+ this.connectionId = connectionId;
this.sql = sql;
this.maxRowCount = maxRowCount;
}
@@ -220,17 +224,18 @@ public interface Service {
}
/** Request for
- * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}. */
+ * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int)}. */
class PrepareRequest extends Request {
- public final int statementId;
+ public final String connectionId;
public final String sql;
public final int maxRowCount;
@JsonCreator
- public PrepareRequest(@JsonProperty("statementId") int statementId,
+ public PrepareRequest(
+ @JsonProperty("connectionId") String connectionId,
@JsonProperty("sql") String sql,
@JsonProperty("maxRowCount") int maxRowCount) {
- this.statementId = statementId;
+ this.connectionId = connectionId;
this.sql = sql;
this.maxRowCount = maxRowCount;
}
@@ -243,18 +248,19 @@ public interface Service {
/** Response from
* {@link org.apache.calcite.avatica.remote.Service.PrepareRequest}. */
class PrepareResponse extends Response {
- public final Meta.Signature signature;
+ public final Meta.StatementHandle statement;
@JsonCreator
public PrepareResponse(
- @JsonProperty("signature") Meta.Signature signature) {
- this.signature = signature;
+ @JsonProperty("statement") Meta.StatementHandle statement) {
+ this.statement = statement;
}
}
/** Request for
* {@link org.apache.calcite.avatica.Meta#fetch(Meta.StatementHandle, List, int, int)}. */
class FetchRequest extends Request {
+ public final String connectionId;
public final int statementId;
public final int offset;
/** Maximum number of rows to be returned in the frame. Negative means no
@@ -265,10 +271,13 @@ public interface Service {
public final List<Object> parameterValues;
@JsonCreator
- public FetchRequest(@JsonProperty("statementId") int statementId,
+ public FetchRequest(
+ @JsonProperty("connectionId") String connectionId,
+ @JsonProperty("statementId") int statementId,
@JsonProperty("parameterValues") List<Object> parameterValues,
@JsonProperty("offset") int offset,
@JsonProperty("fetchMaxRowCount") int fetchMaxRowCount) {
+ this.connectionId = connectionId;
this.statementId = statementId;
this.parameterValues = parameterValues;
this.offset = offset;
@@ -294,10 +303,11 @@ public interface Service {
/** Request for
* {@link org.apache.calcite.avatica.Meta#createStatement(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
class CreateStatementRequest extends Request {
- public final int connectionId;
+ public final String connectionId;
@JsonCreator
- public CreateStatementRequest(@JsonProperty("signature") int connectionId) {
+ public CreateStatementRequest(
+ @JsonProperty("signature") String connectionId) {
this.connectionId = connectionId;
}
@@ -309,21 +319,29 @@ public interface Service {
/** Response from
* {@link org.apache.calcite.avatica.remote.Service.CreateStatementRequest}. */
class CreateStatementResponse extends Response {
- public final int id;
+ public final String connectionId;
+ public final int statementId;
@JsonCreator
- public CreateStatementResponse(@JsonProperty("id") int id) {
- this.id = id;
+ public CreateStatementResponse(
+ @JsonProperty("connectionId") String connectionId,
+ @JsonProperty("statementId") int statementId) {
+ this.connectionId = connectionId;
+ this.statementId = statementId;
}
}
/** Request for
* {@link org.apache.calcite.avatica.Meta#closeStatement(org.apache.calcite.avatica.Meta.StatementHandle)}. */
class CloseStatementRequest extends Request {
+ public final String connectionId;
public final int statementId;
@JsonCreator
- public CloseStatementRequest(@JsonProperty("id") int statementId) {
+ public CloseStatementRequest(
+ @JsonProperty("connectionId") String connectionId,
+ @JsonProperty("statementId") int statementId) {
+ this.connectionId = connectionId;
this.statementId = statementId;
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 9c03b3b..6713632 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -17,6 +17,7 @@
package org.apache.calcite.avatica.test;
import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.jdbc.JdbcMeta;
@@ -27,6 +28,7 @@ import org.apache.calcite.avatica.remote.Service;
import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -72,6 +74,11 @@ public class RemoteDriverTest {
return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRJS);
}
+ @Before
+ public void before() throws Exception {
+ QuasiRemoteJdbcServiceFactory.initService();
+ }
+
@Test public void testRegister() throws Exception {
final Connection connection =
DriverManager.getConnection("jdbc:avatica:remote:");
@@ -194,7 +201,7 @@ public class RemoteDriverTest {
try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
Map<Integer, Statement> serverMap =
- QuasiRemoteJdbcServiceFactory.getRemoteMap(connection);
+ QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
assertEquals(0, clientMap.size());
assertEquals(0, serverMap.size());
Statement stmt = connection.createStatement();
@@ -206,6 +213,33 @@ public class RemoteDriverTest {
}
}
+ @Test public void testConnectionIsolation() throws Exception {
+ final String sql = "select * from (values (1, 'a'))";
+ Connection conn1 = ljs();
+ Connection conn2 = ljs();
+ Map<String, Connection> connectionMap =
+ QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
+ (AvaticaConnection) conn1);
+ assertEquals("should contain at least the default connection",
+ 1, connectionMap.size());
+ PreparedStatement conn1stmt1 = conn1.prepareStatement(sql);
+ assertEquals(
+ "statement creation implicitly creates a connection server-side",
+ 2, connectionMap.size());
+ PreparedStatement conn2stmt1 = conn2.prepareStatement(sql);
+ assertEquals(
+ "statement creation implicitly creates a connection server-side",
+ 3, connectionMap.size());
+ AvaticaPreparedStatement s1 = (AvaticaPreparedStatement) conn1stmt1;
+ AvaticaPreparedStatement s2 = (AvaticaPreparedStatement) conn2stmt1;
+ assertFalse("connection id's should be unique",
+ s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
+ conn2.close();
+ conn1.close();
+// assertEquals("closing a connection closes the server-side connection",
+// 1, connectionMap.size());
+ }
+
private void checkStatementExecuteQuery(Connection connection)
throws SQLException {
final Statement statement = connection.createStatement();
@@ -278,42 +312,51 @@ public class RemoteDriverTest {
connection.close();
}
- /** Factory that creates a service based on a local JDBC connection. */
+ /**
+ * Factory that creates a service based on a local JDBC connection.
+ */
public static class LocalJdbcServiceFactory implements Service.Factory {
@Override public Service create(AvaticaConnection connection) {
try {
- Connection connection1 =
- DriverManager.getConnection(CONNECTION_SPEC.url,
- CONNECTION_SPEC.username, CONNECTION_SPEC.password);
- return new LocalService(new JdbcMeta(connection1));
+ return new LocalService(new JdbcMeta(CONNECTION_SPEC.url,
+ CONNECTION_SPEC.username, CONNECTION_SPEC.password));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
- /** Factory that creates a service based on a local JDBC connection. */
+ /**
+ * Factory that creates a service based on a local JDBC connection.
+ */
public static class QuasiRemoteJdbcServiceFactory implements Service.Factory {
- @Override public Service create(AvaticaConnection connection) {
+
+ /** a singleton instance that is recreated for each test */
+ private static Service service;
+
+ static void initService() {
try {
- Connection connection1 =
- DriverManager.getConnection(CONNECTION_SPEC.url,
- CONNECTION_SPEC.username, CONNECTION_SPEC.password);
- final JdbcMeta jdbcMeta = new JdbcMeta(connection1);
+ final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
+ CONNECTION_SPEC.username, CONNECTION_SPEC.password);
final LocalService localService = new LocalService(jdbcMeta);
- final LocalJsonService localJsonService =
- new LocalJsonService(localService);
- return localJsonService;
+ service = new LocalJsonService(localService);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ @Override public Service create(AvaticaConnection connection) {
+ assert service != null;
+ return service;
+ }
+
/**
* Reach into the guts of a quasi-remote connection and pull out the
* statement map from the other side.
+ * TODO: refactor tests to replace reflection with package-local access
*/
- static Map<Integer, Statement> getRemoteMap(AvaticaConnection connection) throws Exception {
+ static Map<Integer, Statement>
+ getRemoteStatementMap(AvaticaConnection connection) throws Exception {
Field metaF = AvaticaConnection.class.getDeclaredField("meta");
metaF.setAccessible(true);
Meta clientMeta = (Meta) metaF.get(connection);
@@ -332,6 +375,32 @@ public class RemoteDriverTest {
jdbcMetaStatementMapF.setAccessible(true);
return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
}
+
+ /**
+ * Reach into the guts of a quasi-remote connection and pull out the
+ * connection map from the other side.
+ * TODO: refactor tests to replace reflection with package-local access
+ */
+ static Map<String, Connection>
+ getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
+ Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+ metaF.setAccessible(true);
+ Meta clientMeta = (Meta) metaF.get(connection);
+ Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+ remoteMetaServiceF.setAccessible(true);
+ LocalJsonService remoteMetaService = (LocalJsonService) remoteMetaServiceF.get(clientMeta);
+ Field remoteMetaServiceServiceF = remoteMetaService.getClass().getDeclaredField("service");
+ remoteMetaServiceServiceF.setAccessible(true);
+ LocalService remoteMetaServiceService =
+ (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+ Field remoteMetaServiceServiceMetaF =
+ remoteMetaServiceService.getClass().getDeclaredField("meta");
+ remoteMetaServiceServiceMetaF.setAccessible(true);
+ JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+ Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("connectionMap");
+ jdbcMetaStatementMapF.setAccessible(true);
+ return (Map<String, Connection>) jdbcMetaStatementMapF.get(serverMeta);
+ }
}
/** Information necessary to create a JDBC connection. Specify one to run
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index ad2f9cf..849ef63 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -190,7 +190,8 @@ public class CalciteMetaImpl extends MetaImpl {
return Linq4j.asEnumerable(firstFrame.rows);
}
};
- return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+ return new MetaResultSet(connection.id, statement.getId(), true,
+ signature, firstFrame);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -459,16 +460,20 @@ public class CalciteMetaImpl extends MetaImpl {
}
}
- public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
+ @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
+ int maxRowCount) {
+ final StatementHandle h = createStatement(ch);
final CalciteConnectionImpl calciteConnection = getConnection();
CalciteServerStatement statement = calciteConnection.server.getStatement(h);
- return calciteConnection.parseQuery(sql, statement.createPrepareContext(),
+ calciteConnection.parseQuery(sql, statement.createPrepareContext(),
maxRowCount);
+ return h;
}
- public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
- int maxRowCount, PrepareCallback callback) {
+ @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
+ String sql, int maxRowCount, PrepareCallback callback) {
final CalcitePrepare.CalciteSignature<Object> signature;
+ final StatementHandle h = createStatement(ch);
try {
synchronized (callback.getMonitor()) {
callback.clear();
@@ -480,7 +485,7 @@ public class CalciteMetaImpl extends MetaImpl {
callback.assign(signature, null);
}
callback.execute();
- return new MetaResultSet(h.id, false, signature, null);
+ return new MetaResultSet(h.connectionId, h.id, false, signature, null);
} catch (SQLException e) {
throw new RuntimeException(e);
}
[03/10] incubator-calcite git commit: [CALCITE-617] Check at
initialization time in CachingInvocationHandler that MD provider is not null
(Jesus Camacho Rodriguez)
Posted by jh...@apache.org.
[CALCITE-617] Check at initialization time in CachingInvocationHandler that MD provider is not null (Jesus Camacho Rodriguez)
Close apache/incubator-calcite#58
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/321dc430
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/321dc430
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/321dc430
Branch: refs/heads/master
Commit: 321dc4303bccd260eb94a957dbc419fe6e5c6afe
Parents: 61eea9c
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Wed Mar 11 16:55:05 2015 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Fri Mar 27 11:37:44 2015 -0700
----------------------------------------------------------------------
.../apache/calcite/rel/metadata/CachingRelMetadataProvider.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/321dc430/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
index a45501b..516de3a 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/CachingRelMetadataProvider.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.RelNode;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.lang.reflect.InvocationHandler;
@@ -100,7 +101,7 @@ public class CachingRelMetadataProvider implements RelMetadataProvider {
private final Metadata metadata;
public CachingInvocationHandler(Metadata metadata) {
- this.metadata = metadata;
+ this.metadata = Preconditions.checkNotNull(metadata);
}
public Object invoke(Object proxy, Method method, Object[] args)
[05/10] incubator-calcite git commit: [CALCITE-626] Implement
CloseStatement RPC (Nick Dimiduk)
Posted by jh...@apache.org.
[CALCITE-626] Implement CloseStatement RPC (Nick Dimiduk)
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/208eda66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/208eda66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/208eda66
Branch: refs/heads/master
Commit: 208eda6666b51fa5daed80e454dfb5aee0dff652
Parents: aa7cb03
Author: Nick Dimiduk <nd...@gmail.com>
Authored: Tue Mar 24 15:44:48 2015 -0700
Committer: julianhyde <jh...@apache.org>
Committed: Fri Mar 27 13:20:31 2015 -0700
----------------------------------------------------------------------
.../calcite/avatica/AvaticaConnection.java | 4 +-
.../calcite/avatica/AvaticaStatement.java | 7 +++
.../java/org/apache/calcite/avatica/Meta.java | 4 ++
.../apache/calcite/avatica/jdbc/JdbcMeta.java | 14 +++++
.../calcite/avatica/remote/JsonService.java | 9 +++
.../calcite/avatica/remote/LocalService.java | 6 ++
.../calcite/avatica/remote/RemoteMeta.java | 5 ++
.../apache/calcite/avatica/remote/Service.java | 31 +++++++++-
.../calcite/avatica/test/RemoteDriverTest.java | 63 ++++++++++++++++++++
.../apache/calcite/jdbc/CalciteMetaImpl.java | 7 +++
10 files changed, 147 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index bc8c3eb..293cc6c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
/**
@@ -66,7 +67,8 @@ public abstract class AvaticaConnection implements Connection {
protected final AvaticaDatabaseMetaData metaData;
public final Helper helper = Helper.INSTANCE;
public final Map<InternalProperty, Object> properties = new HashMap<>();
- public final Map<Integer, AvaticaStatement> statementMap = new HashMap<>();
+ public final Map<Integer, AvaticaStatement> statementMap =
+ new ConcurrentHashMap<>();
private static int nextId;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 6df3528..2ad74bf 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -142,6 +142,13 @@ public abstract class AvaticaStatement
openResultSet = null;
c.close();
}
+ try {
+ // inform the server to close the resource
+ connection.meta.closeStatement(handle);
+ } finally {
+ // make sure we don't leak on our side
+ connection.statementMap.remove(handle.id);
+ }
// If onStatementClose throws, this method will throw an exception (later
// converted to SQLException), but this statement still gets closed.
connection.driver.handler.onStatementClose(this);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index f7f3b80..cd0166c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -196,6 +196,10 @@ public interface Meta {
*/
StatementHandle createStatement(ConnectionHandle ch);
+ /** Close a statement.
+ */
+ void closeStatement(StatementHandle h);
+
/** Factory to create instances of {@link Meta}. */
interface Factory {
Meta create(List<String> args);
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 56c20c4..72c3948 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -370,6 +370,20 @@ public class JdbcMeta implements Meta {
}
}
+ @Override public void closeStatement(StatementHandle h) {
+ Statement stmt = statementMap.get(h.id).statement;
+ if (stmt == null) {
+ return;
+ }
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ throw propagate(e);
+ } finally {
+ statementMap.remove(h.id);
+ }
+ }
+
private RuntimeException propagate(Throwable e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index cc0ca6e..8ba08cf 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -129,6 +129,15 @@ public abstract class JsonService implements Service {
throw handle(e);
}
}
+
+ @Override
+ public CloseStatementResponse apply(CloseStatementRequest request) {
+ try {
+ return decode(apply(encode(request)), CloseStatementResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
}
// End JsonService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index fa7611e..e302b85 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -184,6 +184,12 @@ public class LocalService implements Service {
meta.createStatement(new Meta.ConnectionHandle(request.connectionId));
return new CreateStatementResponse(h.id);
}
+
+ @Override
+ public CloseStatementResponse apply(CloseStatementRequest request) {
+ meta.closeStatement(new Meta.StatementHandle(request.statementId));
+ return new CloseStatementResponse();
+ }
}
// End LocalService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 10439ce..e8bbfb5 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -58,6 +58,11 @@ class RemoteMeta extends MetaImpl {
return new StatementHandle(response.id);
}
+ @Override public void closeStatement(StatementHandle h) {
+ final Service.CloseStatementResponse response =
+ service.apply(new Service.CloseStatementRequest(h.id));
+ }
+
@Override public MetaResultSet getCatalogs() {
final Service.ResultSetResponse response =
service.apply(new Service.CatalogsRequest());
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index da897da..bbe3552 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -39,6 +39,7 @@ public interface Service {
ResultSetResponse apply(PrepareAndExecuteRequest request);
FetchResponse apply(FetchRequest request);
CreateStatementResponse apply(CreateStatementRequest request);
+ CloseStatementResponse apply(CloseStatementRequest request);
/** Factory that creates a {@code Service}. */
interface Factory {
@@ -62,7 +63,9 @@ public interface Service {
name = "prepareAndExecute"),
@JsonSubTypes.Type(value = FetchRequest.class, name = "fetch"),
@JsonSubTypes.Type(value = CreateStatementRequest.class,
- name = "createStatement") })
+ name = "createStatement"),
+ @JsonSubTypes.Type(value = CloseStatementRequest.class,
+ name = "closeStatement") })
abstract class Request {
abstract Response accept(Service service);
}
@@ -77,7 +80,9 @@ public interface Service {
@JsonSubTypes.Type(value = PrepareResponse.class, name = "prepare"),
@JsonSubTypes.Type(value = FetchResponse.class, name = "fetch"),
@JsonSubTypes.Type(value = CreateStatementResponse.class,
- name = "createStatement") })
+ name = "createStatement"),
+ @JsonSubTypes.Type(value = CloseStatementResponse.class,
+ name = "closeStatement") })
abstract class Response {
}
@@ -311,6 +316,28 @@ public interface Service {
this.id = id;
}
}
+
+ /** Request for
+ * {@link org.apache.calcite.avatica.Meta#closeStatement(org.apache.calcite.avatica.Meta.StatementHandle)}. */
+ class CloseStatementRequest extends Request {
+ public final int statementId;
+
+ @JsonCreator
+ public CloseStatementRequest(@JsonProperty("id") int statementId) {
+ this.statementId = statementId;
+ }
+
+ @Override CloseStatementResponse accept(Service service) {
+ return service.apply(this);
+ }
+ }
+
+ /** Response from
+ * {@link org.apache.calcite.avatica.remote.Service.CloseStatementRequest}. */
+ class CloseStatementResponse extends Response {
+ @JsonCreator
+ public CloseStatementResponse() {}
+ }
}
// End Service.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index c007f02..9c03b3b 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -17,6 +17,8 @@
package org.apache.calcite.avatica.test;
import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.jdbc.JdbcMeta;
import org.apache.calcite.avatica.remote.LocalJsonService;
import org.apache.calcite.avatica.remote.LocalService;
@@ -28,6 +30,7 @@ import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
import org.junit.Ignore;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ParameterMetaData;
@@ -36,6 +39,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -186,6 +190,41 @@ public class RemoteDriverTest {
connection.close();
}
+ @Test public void testStatementLifecycle() throws Exception {
+ try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
+ Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
+ Map<Integer, Statement> serverMap =
+ QuasiRemoteJdbcServiceFactory.getRemoteMap(connection);
+ assertEquals(0, clientMap.size());
+ assertEquals(0, serverMap.size());
+ Statement stmt = connection.createStatement();
+ assertEquals(1, clientMap.size());
+ assertEquals(1, serverMap.size());
+ stmt.close();
+ assertEquals(0, clientMap.size());
+ assertEquals(0, serverMap.size());
+ }
+ }
+
+ private void checkStatementExecuteQuery(Connection connection)
+ throws SQLException {
+ final Statement statement = connection.createStatement();
+ final ResultSet resultSet =
+ statement.executeQuery("select * from (\n"
+ + " values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)");
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertEquals(2, metaData.getColumnCount());
+ assertEquals("C1", metaData.getColumnName(1));
+ assertEquals("C2", metaData.getColumnName(2));
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ resultSet.close();
+ statement.close();
+ connection.close();
+ }
+
@Test public void testPrepareBindExecuteFetch() throws Exception {
checkPrepareBindExecuteFetch(ljs());
}
@@ -269,6 +308,30 @@ public class RemoteDriverTest {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Reach into the guts of a quasi-remote connection and pull out the
+ * statement map from the other side.
+ */
+ static Map<Integer, Statement> getRemoteMap(AvaticaConnection connection) throws Exception {
+ Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+ metaF.setAccessible(true);
+ Meta clientMeta = (Meta) metaF.get(connection);
+ Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+ remoteMetaServiceF.setAccessible(true);
+ LocalJsonService remoteMetaService = (LocalJsonService) remoteMetaServiceF.get(clientMeta);
+ Field remoteMetaServiceServiceF = remoteMetaService.getClass().getDeclaredField("service");
+ remoteMetaServiceServiceF.setAccessible(true);
+ LocalService remoteMetaServiceService =
+ (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+ Field remoteMetaServiceServiceMetaF =
+ remoteMetaServiceService.getClass().getDeclaredField("meta");
+ remoteMetaServiceServiceMetaF.setAccessible(true);
+ JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+ Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementMap");
+ jdbcMetaStatementMapF.setAccessible(true);
+ return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
+ }
}
/** Information necessary to create a JDBC connection. Specify one to run
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/208eda66/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index cf395f6..ad2f9cf 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -135,6 +135,13 @@ public class CalciteMetaImpl extends MetaImpl {
return h;
}
+ @Override public void closeStatement(StatementHandle h) {
+ final CalciteConnectionImpl calciteConnection = getConnection();
+ CalciteServerStatement stmt = calciteConnection.server.getStatement(h);
+ // stmt.close(); // TODO: implement
+ calciteConnection.server.removeStatement(h);
+ }
+
private <E> MetaResultSet createResultSet(Enumerable<E> enumerable,
Class clazz, String... names) {
final List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();