You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/31 01:16:36 UTC
[4/4] incubator-ignite git commit: ignite-1142 - fake thread local
tables
ignite-1142 - fake thread local tables
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ab5c7e41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ab5c7e41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ab5c7e41
Branch: refs/heads/ignite-1142
Commit: ab5c7e4116cbdfb13b08cd3f4bafbc1ffa184926
Parents: cfd1fb2
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Jul 31 02:15:47 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Jul 31 02:15:47 2015 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheSqlQuery.java | 18 +-
.../cache/query/GridCacheTwoStepQuery.java | 19 +-
.../processors/query/h2/IgniteH2Indexing.java | 13 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 36 +--
.../query/h2/twostep/GridMergeTable.java | 31 ---
.../h2/twostep/GridReduceQueryExecutor.java | 214 +++++----------
.../query/h2/twostep/GridThreadLocalTable.java | 262 +++++++++++++++++++
.../IgniteCacheAbstractFieldsQuerySelfTest.java | 2 +-
8 files changed, 354 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 256fd7c..d5eb379 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -39,9 +39,6 @@ public class GridCacheSqlQuery implements Message {
public static final Object[] EMPTY_PARAMS = {};
/** */
- private String alias;
-
- /** */
@GridToStringInclude
private String qry;
@@ -66,14 +63,12 @@ public class GridCacheSqlQuery implements Message {
}
/**
- * @param alias Alias.
* @param qry Query.
* @param params Query parameters.
*/
- public GridCacheSqlQuery(String alias, String qry, Object[] params) {
+ public GridCacheSqlQuery(String qry, Object[] params) {
A.ensure(!F.isEmpty(qry), "qry must not be empty");
- this.alias = alias;
this.qry = qry;
this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
@@ -97,13 +92,6 @@ public class GridCacheSqlQuery implements Message {
}
/**
- * @return Alias.
- */
- public String alias() {
- return alias;
- }
-
- /**
* @return Query.
*/
public String query() {
@@ -161,7 +149,7 @@ public class GridCacheSqlQuery implements Message {
switch (writer.state()) {
case 0:
- if (!writer.writeString("alias", alias))
+ if (!writer.writeString("alias", null))
return false;
writer.incrementState();
@@ -192,7 +180,7 @@ public class GridCacheSqlQuery implements Message {
switch (reader.state()) {
case 0:
- alias = reader.readString("alias");
+ reader.readString("alias");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 8613df8..83a79e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -17,10 +17,7 @@
package org.apache.ignite.internal.processors.cache.query;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.*;
@@ -34,7 +31,7 @@ public class GridCacheTwoStepQuery {
/** */
@GridToStringInclude
- private Map<String, GridCacheSqlQuery> mapQrys;
+ private List<GridCacheSqlQuery> mapQrys = new ArrayList<>();
/** */
@GridToStringInclude
@@ -93,15 +90,7 @@ public class GridCacheTwoStepQuery {
* @param qry SQL Query.
*/
public void addMapQuery(GridCacheSqlQuery qry) {
- String alias = qry.alias();
-
- A.ensure(!F.isEmpty(alias), "alias must not be empty");
-
- if (mapQrys == null)
- mapQrys = new GridLeanMap<>();
-
- if (mapQrys.put(alias, qry) != null)
- throw new IgniteException("Failed to add query, alias already exists: " + alias + ".");
+ mapQrys.add(qry);
}
/**
@@ -114,8 +103,8 @@ public class GridCacheTwoStepQuery {
/**
* @return Map queries.
*/
- public Collection<GridCacheSqlQuery> mapQueries() {
- return mapQrys.values();
+ public List<GridCacheSqlQuery> mapQueries() {
+ return mapQrys;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index df6ac49..dc61d76 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1329,14 +1329,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- executeStatement("INFORMATION_SCHEMA", "SHUTDOWN");
-
for (Connection c : conns)
U.close(c, log);
conns.clear();
schemas.clear();
+ try (Connection c = DriverManager.getConnection(dbUrl);
+ Statement s = c.createStatement()) {
+ s.execute("SHUTDOWN");
+ }
+ catch (SQLException e) {
+ U.error(log, "Failed to shutdown database.", e);
+ }
+
if (log.isDebugEnabled())
log.debug("Cache query index stopped.");
}
@@ -1352,9 +1358,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
createSchema(schema);
- executeStatement(schema, "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME +
- " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
-
createSqlFunctions(schema, ccfg.getSqlFunctionClasses());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 9326b01..2f8bcdd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.h2.*;
import org.apache.ignite.internal.util.typedef.*;
import org.h2.jdbc.*;
-import org.h2.value.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -35,20 +34,20 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlacehol
*/
public class GridSqlQuerySplitter {
/** */
- private static final String TABLE_PREFIX = "__T";
+ private static final String TABLE_SCHEMA = "PUBLIC";
/** */
- private static final String COLUMN_PREFIX = "__C";
+ private static final String TABLE_PREFIX = "__T";
/** */
- public static final String TABLE_FUNC_NAME = "__Z0";
+ private static final String COLUMN_PREFIX = "__C";
/**
* @param idx Index of table.
- * @return Table name.
+ * @return Table.
*/
- private static String table(int idx) {
- return TABLE_PREFIX + idx;
+ public static GridSqlTable table(int idx) {
+ return new GridSqlTable(TABLE_SCHEMA, TABLE_PREFIX + idx);
}
/**
@@ -141,13 +140,11 @@ public class GridSqlQuerySplitter {
// nullifying or updating things, have to make sure that we will not need them in the original form later.
final GridSqlSelect mapQry = wrapUnion(collectAllSpaces(GridSqlQueryParser.parse(stmt), spaces));
- final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO IGNITE-1142
-
final boolean explain = mapQry.explain();
mapQry.explain(false);
- GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction(null, TABLE_FUNC_NAME)); // table(mergeTable)); TODO IGNITE-1142
+ GridSqlSelect rdcQry = new GridSqlSelect().from(table(0));
// Split all select expressions into map-reduce parts.
List<GridSqlElement> mapExps = F.addAll(new ArrayList<GridSqlElement>(mapQry.allColumns()),
@@ -218,10 +215,10 @@ public class GridSqlQuerySplitter {
}
// Build resulting two step query.
- GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, new GridCacheSqlQuery(null, rdcQry.getSQL(),
+ GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, new GridCacheSqlQuery(rdcQry.getSQL(),
findParams(rdcQry, params, new ArrayList<>()).toArray()));
- res.addMapQuery(new GridCacheSqlQuery(mergeTable, mapQry.getSQL(),
+ res.addMapQuery(new GridCacheSqlQuery(mapQry.getSQL(),
findParams(mapQry, params, new ArrayList<>(params.length)).toArray())
.columns(collectColumns(mapExps)));
@@ -458,13 +455,6 @@ public class GridSqlQuerySplitter {
if (idx < rdcSelect.length) { // SELECT __C0 AS original_alias
GridSqlElement rdcEl = column(mapColAlias);
- GridSqlType type = el.resultType();
-
- assert type != null;
-
- if (type.type() == Value.UUID) // There is no JDBC type UUID, so conversion to bytes occurs.
- rdcEl = function(CAST).resultType(GridSqlType.UUID).addChild(rdcEl); // TODO IGNITE-1142 - remove this cast when table function removed
-
if (colNames.add(rdcColAlias)) // To handle column name duplication (usually wildcard for few tables).
rdcEl = alias(rdcColAlias, rdcEl);
@@ -662,12 +652,4 @@ public class GridSqlQuerySplitter {
private static GridSqlFunction function(GridSqlFunctionType type) {
return new GridSqlFunction(type);
}
-
- /**
- * @param name Table name.
- * @return Table.
- */
- private static GridSqlTable table(String name) {
- return new GridSqlTable(null, name);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index c9cdff2..26a92ae 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import org.h2.api.*;
import org.h2.command.ddl.*;
import org.h2.engine.*;
import org.h2.index.*;
@@ -153,34 +152,4 @@ public class GridMergeTable extends TableBase {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
-
- /**
- * Engine.
- */
- public static class Engine implements TableEngine {
- /** */
- private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>();
-
- /**
- * @return Created table.
- */
- public static GridMergeTable getCreated() {
- GridMergeTable tbl = createdTbl.get();
-
- assert tbl != null;
-
- createdTbl.remove();
-
- return tbl;
- }
-
- /** {@inheritDoc} */
- @Override public Table createTable(CreateTableData data) {
- GridMergeTable tbl = new GridMergeTable(data);
-
- createdTbl.set(tbl);
-
- return tbl;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index e34ddd6..6a988e1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -39,12 +39,9 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.h2.command.ddl.*;
import org.h2.engine.*;
-import org.h2.expression.*;
-import org.h2.index.*;
import org.h2.jdbc.*;
import org.h2.result.*;
import org.h2.table.*;
-import org.h2.tools.*;
import org.h2.util.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
@@ -56,6 +53,7 @@ import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
@@ -79,7 +77,10 @@ public class GridReduceQueryExecutor {
private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
/** */
- private static ThreadLocal<GridMergeTable> curFunTbl = new ThreadLocal<>();
+ private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
+
+ /** */
+ private final Lock fakeTblsLock = new ReentrantLock();
/** */
private static final Constructor<JdbcResultSet> CONSTRUCTOR;
@@ -462,11 +463,13 @@ public class GridReduceQueryExecutor {
nodes = Collections.singleton(F.rand(nodes));
}
+ int tblIdx = 0;
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeTable tbl;
try {
- tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
+ tbl = createMergeTable(r.conn, mapQry, qry.explain());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -479,7 +482,7 @@ public class GridReduceQueryExecutor {
r.tbls.add(tbl);
- curFunTbl.set(tbl);
+ fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
}
r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
@@ -499,7 +502,7 @@ public class GridReduceQueryExecutor {
mapQrys = new ArrayList<>(qry.mapQueries().size());
for (GridCacheSqlQuery mapQry : qry.mapQueries())
- mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+ mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
}
if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
@@ -552,8 +555,6 @@ public class GridReduceQueryExecutor {
for (GridMergeTable tbl : r.tbls) {
if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
send(nodes, new GridQueryCancelRequest(qryReqId), null);
-
-// dropTable(r.conn, tbl.getName()); TODO
}
if (retry) {
@@ -587,12 +588,61 @@ public class GridReduceQueryExecutor {
if (!runs.remove(qryReqId, r))
U.warn(log, "Query run was already removed: " + qryReqId);
- curFunTbl.remove();
+ for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+ fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
}
}
}
/**
+ * @param idx Table index.
+ * @return Table name.
+ */
+ private static String table(int idx) {
+ return GridSqlQuerySplitter.table(idx).getSQL();
+ }
+
+ /**
+ * Gets or creates new fake table for index.
+ *
+ * @param idx Index of table.
+ * @return Table.
+ */
+ private GridThreadLocalTable fakeTable(Connection c, int idx) {
+ List<GridThreadLocalTable> tbls = fakeTbls;
+
+ assert tbls.size() >= idx;
+
+ if (tbls.size() == idx) { // If table for such index does not exist, create one.
+ fakeTblsLock.lock();
+
+ try {
+ if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
+ try (Statement stmt = c.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE " + table(idx) +
+ "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
+ }
+ catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+
+ List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1);
+
+ newTbls.addAll(tbls);
+ newTbls.add(GridThreadLocalTable.Engine.getCreated());
+
+ fakeTbls = tbls = newTbls;
+ }
+ }
+ finally {
+ fakeTblsLock.unlock();
+ }
+ }
+
+ return tbls.get(idx);
+ }
+
+ /**
* Calculates data nodes for replicated caches on unstable topology.
*
* @param cctx Cache context for main space.
@@ -825,16 +875,18 @@ public class GridReduceQueryExecutor {
throws IgniteCheckedException {
List<List<?>> lists = new ArrayList<>();
- for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + mapQry.alias(), null);
+ for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
+ ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null);
lists.add(F.asList(getPlan(rs)));
}
+ int tblIdx = 0;
+
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl = createFunctionTable(c, mapQry, false);
+ GridMergeTable tbl = createMergeTable(c, mapQry, false);
- curFunTbl.set(tbl); // Now it will be only a single table.
+ fakeTable(c, tblIdx++).setInnerTable(tbl);
}
GridCacheSqlQuery rdc = qry.reduceQuery();
@@ -928,118 +980,12 @@ public class GridReduceQueryExecutor {
/**
* @param conn Connection.
- * @param tblName Table name.
- * @throws SQLException If failed.
- */
- private void dropTable(Connection conn, String tblName) throws SQLException {
- try (Statement s = conn.createStatement()) {
- s.execute("DROP TABLE " + tblName);
- }
- }
-
- /**
- * @return Merged result set.
- */
- public static ResultSet mergeTableFunction(JdbcConnection c) throws Exception {
- GridMergeTable tbl = curFunTbl.get();
-
- Session ses = (Session)c.getSession();
-
- String url = c.getMetaData().getURL();
-
- // URL is either "jdbc:default:connection" or "jdbc:columnlist:connection"
- final Cursor cursor = url.charAt(5) == 'c' ? null : tbl.getScanIndex(ses).find(ses, null, null);
-
- final Column[] cols = tbl.getColumns();
-
- SimpleResultSet rs = new SimpleResultSet(cursor == null ? null : new SimpleRowSource() {
- @Override public Object[] readRow() throws SQLException {
- if (!cursor.next())
- return null;
-
- Row r = cursor.get();
-
- Object[] row = new Object[cols.length];
-
- for (int i = 0; i < row.length; i++)
- row[i] = r.getValue(i).getObject();
-
- return row;
- }
-
- @Override public void close() {
- // No-op.
- }
-
- @Override public void reset() throws SQLException {
- throw new SQLException("Unsupported.");
- }
- }) {
- @Override public byte[] getBytes(int colIdx) throws SQLException {
- assert cursor != null;
-
- return cursor.get().getValue(colIdx - 1).getBytes();
- }
-
- @Override public <T> T getObject(int columnIndex, Class<T> type) throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
- throw new UnsupportedOperationException();
- }
- };
-
- for (Column col : cols)
- rs.addColumn(col.getName(), DataType.convertTypeToSQLType(col.getType()),
- MathUtils.convertLongToInt(col.getPrecision()), col.getScale());
-
- return rs;
- }
-
- /**
- * @param asQuery Query.
- * @return List of columns.
- */
- private static ArrayList<Column> generateColumnsFromQuery(org.h2.command.dml.Query asQuery) {
- int columnCount = asQuery.getColumnCount();
- ArrayList<Expression> expressions = asQuery.getExpressions();
- ArrayList<Column> cols = new ArrayList<>();
- for (int i = 0; i < columnCount; i++) {
- Expression expr = expressions.get(i);
- int type = expr.getType();
- String name = expr.getAlias();
- long precision = expr.getPrecision();
- int displaySize = expr.getDisplaySize();
- DataType dt = DataType.getDataType(type);
- if (precision > 0 && (dt.defaultPrecision == 0 ||
- (dt.defaultPrecision > precision && dt.defaultPrecision < Byte.MAX_VALUE))) {
- // dont' set precision to MAX_VALUE if this is the default
- precision = dt.defaultPrecision;
- }
- int scale = expr.getScale();
- if (scale > 0 && (dt.defaultScale == 0 ||
- (dt.defaultScale > scale && dt.defaultScale < precision))) {
- scale = dt.defaultScale;
- }
- if (scale > precision) {
- precision = scale;
- }
- Column col = new Column(name, type, precision, scale, displaySize);
- cols.add(col);
- }
-
- return cols;
- }
-
- /**
- * @param conn Connection.
* @param qry Query.
* @param explain Explain.
* @return Table.
* @throws IgniteCheckedException
*/
- private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+ private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
try {
Session ses = (Session)conn.getSession();
@@ -1094,32 +1040,6 @@ public class GridReduceQueryExecutor {
}
/**
- * @param conn Connection.
- * @param qry Query.
- * @return Table.
- * @throws IgniteCheckedException If failed.
- */
- private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException {
- try {
- try (PreparedStatement s = conn.prepareStatement(
- "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
- " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " +
- " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) {
- h2.bindParameters(s, F.asList(qry.parameters()));
-
- s.execute();
- }
-
- return GridMergeTable.Engine.getCreated();
- }
- catch (SQLException e) {
- U.closeQuiet(conn);
-
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
* @param reconnectFut Reconnect future.
*/
public void onDisconnected(IgniteFuture<?> reconnectFut) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
new file mode 100644
index 0000000..c468371
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.h2.api.*;
+import org.h2.command.ddl.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.result.*;
+import org.h2.schema.*;
+import org.h2.table.*;
+import org.h2.value.*;
+
+import java.util.*;
+
+/**
+ * Thread local table wrapper for another table instance.
+ */
+public class GridThreadLocalTable extends Table {
+ /** Delegate table */
+ private final ThreadLocal<Table> tbl = new ThreadLocal<>();
+
+ /**
+ * @param schema Schema.
+ * @param id ID.
+ * @param name Table name.
+ * @param persistIndexes Persist indexes.
+ * @param persistData Persist data.
+ */
+ public GridThreadLocalTable(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
+ super(schema, id, name, persistIndexes, persistData);
+ }
+
+ /**
+ * @param t Table or {@code null} to reset existing.
+ */
+ public void setInnerTable(Table t) {
+ if (t == null)
+ tbl.remove();
+ else
+ tbl.set(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getPrimaryKey() {
+ return tbl.get().getPrimaryKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column getRowIdColumn() {
+ return tbl.get().getRowIdColumn();
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+ return tbl.get().getBestPlanItem(session, masks, filter, sortOrder);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value getDefaultValue(Session session, Column column) {
+ return tbl.get().getDefaultValue(session, column);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) {
+ return tbl.get().getTemplateSimpleRow(singleColumn);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row getTemplateRow() {
+ return tbl.get().getTemplateRow();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column getColumn(String columnName) {
+ return tbl.get().getColumn(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column getColumn(int index) {
+ return tbl.get().getColumn(index);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getIndexForColumn(Column column) {
+ return tbl.get().getIndexForColumn(column);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column[] getColumns() {
+ return tbl.get().getColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void setColumns(Column[] columns) {
+ throw new IllegalStateException("Cols: " + Arrays.asList(columns));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void lock(Session session, boolean exclusive, boolean force) {
+ tbl.get().lock(session, exclusive, force);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(Session session) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unlock(Session s) {
+ tbl.get().unlock(s);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols,
+ IndexType indexType, boolean create, String indexComment) {
+ return tbl.get().addIndex(session, indexName, indexId, cols, indexType, create, indexComment);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeRow(Session session, Row row) {
+ tbl.get().removeRow(session, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session session) {
+ tbl.get().truncate(session);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addRow(Session session, Row row) {
+ tbl.get().addRow(session, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkSupportAlter() {
+ tbl.get().checkSupportAlter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTableType() {
+ return tbl.get().getTableType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getUniqueIndex() {
+ return tbl.get().getUniqueIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Index getScanIndex(Session session) {
+ return tbl.get().getScanIndex(session);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<Index> getIndexes() {
+ return tbl.get().getIndexes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLockedExclusively() {
+ return tbl.get().isLockedExclusively();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxDataModificationId() {
+ return tbl.get().getMaxDataModificationId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDeterministic() {
+ return tbl.get().isDeterministic();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetRowCount() {
+ return tbl.get().canGetRowCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canDrop() {
+ return tbl.get().canDrop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session session) {
+ return tbl.get().getRowCount(session);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return tbl.get().getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return tbl.get().getDiskSpaceUsed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQL() {
+ return "";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDropSQL() {
+ return tbl.get().getDropSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ tbl.get().checkRename();
+ }
+
+ /**
+ * Engine.
+ */
+ public static class Engine implements TableEngine {
+ /** */
+ private static ThreadLocal<GridThreadLocalTable> createdTbl = new ThreadLocal<>();
+
+ /**
+ * @return Created table.
+ */
+ public static GridThreadLocalTable getCreated() {
+ GridThreadLocalTable tbl = createdTbl.get();
+
+ assert tbl != null;
+
+ createdTbl.remove();
+
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(CreateTableData d) {
+ assert createdTbl.get() == null;
+
+ GridThreadLocalTable tbl = new GridThreadLocalTable(d.schema, d.id, d.tableName, d.persistIndexes,
+ d.persistData);
+
+ createdTbl.set(tbl);
+
+ return tbl;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
index ccb3115..18bfd57 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
@@ -316,7 +316,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
if (cacheMode() == PARTITIONED) {
assertEquals(2, res.size());
- assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.TABLE_FUNC_NAME));
+ assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.table(0).getSQL()));
}
else
assertEquals(1, res.size());