You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/05/31 07:36:23 UTC
ignite git commit: IGNITE-5317: Added method to execute SQL fields
query without concrete cache. This closes #2024.
Repository: ignite
Updated Branches:
refs/heads/master 0feadac5f -> c45de1681
IGNITE-5317: Added method to execute SQL fields query without concrete cache. This closes #2024.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45de168
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45de168
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45de168
Branch: refs/heads/master
Commit: c45de1681110e42b88c84d82507b8bc9286182ec
Parents: 0feadac
Author: devozerov <vo...@gridgain.com>
Authored: Wed May 31 10:36:13 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed May 31 10:36:13 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/jdbc2/JdbcConnection.java | 26 +-
.../processors/query/GridQueryIndexing.java | 71 +++--
.../processors/query/GridQueryProcessor.java | 161 ++++++----
.../query/h2/DmlStatementsProcessor.java | 45 +--
.../processors/query/h2/IgniteH2Indexing.java | 306 +++++++------------
.../h2/GridIndexingSpiAbstractSelfTest.java | 121 ++------
6 files changed, 327 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index ee8b605..9385d7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -56,8 +56,12 @@ import org.apache.ignite.compute.ComputeTaskTimeoutException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -544,10 +548,24 @@ public class JdbcConnection implements Connection {
if (!stream)
stmt = new JdbcPreparedStatement(this, sql);
else {
+ GridQueryIndexing idx = ignite().context().query().getIndexing();
+
PreparedStatement nativeStmt = prepareNativeStatement(sql);
- IgniteDataStreamer<?, ?> streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName,
- nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite);
+ if (!idx.isInsertStatement(nativeStmt))
+ throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+ IgniteDataStreamer streamer = ignite().dataStreamer(cacheName);
+
+ streamer.autoFlushFrequency(streamFlushTimeout);
+ streamer.allowOverwrite(streamAllowOverwrite);
+
+ if (streamNodeBufSize > 0)
+ streamer.perNodeBufferSize(streamNodeBufSize);
+
+ if (streamNodeParOps > 0)
+ streamer.perNodeParallelOperations(streamNodeParOps);
stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt);
}
@@ -736,8 +754,8 @@ public class JdbcConnection implements Connection {
/**
* @return Ignite node.
*/
- Ignite ignite() {
- return ignite;
+ IgniteKernal ignite() {
+ return (IgniteKernal)ignite;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 9d66c0a..4429058 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -66,77 +66,79 @@ public interface GridQueryIndexing {
/**
* Parses SQL query into two step query and executes it.
*
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepBinary Keep binary flag.
+ * @param mainCacheId Main cache ID.
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?,?> cctx, SqlQuery qry,
- boolean keepBinary) throws IgniteCheckedException;
+ public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry,
+ boolean keepBinary, int mainCacheId) throws IgniteCheckedException;
/**
* Parses SQL query into two step query and executes it.
*
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepBinary Keep binary flag.
* @param cancel Query cancel.
+ * @param mainCacheId Main cache ID.
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
- public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
- boolean keepBinary, GridQueryCancel cancel) throws IgniteCheckedException;
+ public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
+ boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException;
/**
* Perform a MERGE statement using data streamer as receiver.
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param qry Query.
* @param params Query parameters.
* @param streamer Data streamer to feed data to.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- public long streamUpdateQuery(String cacheName, String qry, @Nullable Object[] params,
+ public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
/**
* Executes regular query.
*
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param filter Cache name and key filter.
* @param keepBinary Keep binary flag.
* @return Cursor.
*/
- public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+ public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName, SqlQuery qry,
IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepBinary Keep binary flag.
* @param filter Cache name and key filter.
* @param cancel Query cancel.
* @return Cursor.
*/
- public FieldsQueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+ public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
/**
* Executes text query.
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param qry Text query.
* @param typeName Type name.
* @param filter Cache name and key filter.
* @return Queried rows.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String cacheName, String qry,
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry,
String typeName, IndexingQueryFilter filter) throws IgniteCheckedException;
/**
@@ -196,11 +198,11 @@ public interface GridQueryIndexing {
/**
* Unregisters type and removes all corresponding data.
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param typeName Type name.
* @throws IgniteCheckedException If failed.
*/
- public void unregisterType(String cacheName, String typeName) throws IgniteCheckedException;
+ public void unregisterType(String schemaName, String typeName) throws IgniteCheckedException;
/**
* Updates index. Note that key is unique for cache, so if cache contains multiple indexes
@@ -231,19 +233,21 @@ public interface GridQueryIndexing {
/**
* Rebuilds all indexes of given type from hash index.
*
- * @param cacheName Cache name.
- * @param type Type descriptor.
+ * @param cctx Cache context.
+ * @param schemaName Schema name.
+ * @param typeName Type name.
* @throws IgniteCheckedException If failed.
*/
- public void rebuildIndexesFromHash(String cacheName, GridQueryTypeDescriptor type) throws IgniteCheckedException;
+ public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName)
+ throws IgniteCheckedException;
/**
* Marks all indexes of given type for rebuild from hash index, making them unusable until rebuild finishes.
*
* @param cacheName Cache name.
- * @param type Type descriptor.
+ * @param typeName Type name.
*/
- public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type);
+ public void markForRebuildFromHash(String cacheName, String typeName);
/**
* Returns backup filter.
@@ -264,11 +268,11 @@ public interface GridQueryIndexing {
/**
* Prepare native statement to retrieve JDBC metadata from.
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param sql Query.
* @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
*/
- public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException;
+ public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException;
/**
* Gets cache name from database schema.
@@ -299,15 +303,18 @@ public interface GridQueryIndexing {
public void cancelAllQueries();
/**
- * @param cacheName Cache name.
+ * Gets database schema from cache name.
+ *
+ * @param cacheName Cache name. {@code null} would be converted to an empty string.
+ * @return Schema name. Should not be null since we should not fail for an invalid cache name.
+ */
+ public String schema(String cacheName);
+
+ /**
+ * Check if passed statement is insert statemtn.
+ *
* @param nativeStmt Native statement.
- * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
- * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
- * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
- * @param allowOverwrite Overwrite existing cache values on key duplication.
- * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata;
- * {@code null} if given statement is a query.
+ * @return {@code True} if insert.
*/
- public IgniteDataStreamer<?,?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq,
- int nodeBufSize, int nodeParOps, boolean allowOverwrite);
+ public boolean isInsertStatement(PreparedStatement nativeStmt);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 320c90b..990226e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,11 +44,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
@@ -1508,10 +1506,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param desc Type descriptor.
* @return Future that will be completed when rebuilding of all indexes is finished.
*/
- private IgniteInternalFuture<Object> rebuildIndexesFromHash(
- @Nullable final String cacheName,
- @Nullable final QueryTypeDescriptorImpl desc
- ) {
+ private IgniteInternalFuture<Object> rebuildIndexesFromHash(@Nullable final String cacheName,
+ @Nullable final QueryTypeDescriptorImpl desc) {
if (idx == null)
return new GridFinishedFuture<>(new IgniteCheckedException("Indexing is disabled."));
@@ -1520,12 +1516,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
final GridWorkerFuture<Object> fut = new GridWorkerFuture<>();
- idx.markForRebuildFromHash(cacheName, desc);
+ final String schemaName = idx.schema(cacheName);
+ final String typeName = desc.name();
+
+ idx.markForRebuildFromHash(schemaName, typeName);
GridWorker w = new GridWorker(ctx.igniteInstanceName(), "index-rebuild-worker", log) {
@Override protected void body() {
try {
- idx.rebuildIndexesFromHash(cacheName, desc);
+ int cacheId = CU.cacheId(cacheName);
+
+ GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+ idx.rebuildIndexesFromHash(cctx, schemaName, typeName);
fut.onDone();
}
@@ -1533,7 +1536,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
fut.onDone(e);
}
catch (Throwable e) {
- log.error("Failed to rebuild indexes for type: " + desc.name(), e);
+ log.error("Failed to rebuild indexes for type: " + typeName, e);
fut.onDone(e);
@@ -1721,12 +1724,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
final boolean keepBinary) {
checkxEnabled();
- if (qry.isReplicatedOnly() && qry.getPartitions() != null)
- throw new CacheException("Partitions are not supported in replicated only mode.");
-
- if (qry.isDistributedJoins() && qry.getPartitions() != null)
- throw new CacheException(
- "Using both partitions and distributed JOINs is not supported for the same query");
+ validateSqlFieldsQuery(qry);
boolean loc = (qry.isReplicatedOnly() && cctx.isReplicatedAffinityNode()) || cctx.isLocal() || qry.isLocal();
@@ -1734,6 +1732,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
+ final String schemaName = idx.schema(cctx.name());
+ final int mainCacheId = CU.cacheId(cctx.name());
+
IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo;
if (loc) {
@@ -1741,32 +1742,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
GridQueryCancel cancel = new GridQueryCancel();
- final FieldsQueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry, keepBinary,
- idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel);
+ FieldsQueryCursor<List<?>> cur;
- Iterable<List<?>> iterExec = new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
- return cursor.iterator();
- }
- };
+ cur = idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, mainCacheId);
+ }
+ else {
+ IndexingQueryFilter filter = idx.backupFilter(requestTopVer.get(), qry.getPartitions());
- return new QueryCursorImpl<List<?>>(iterExec, cancel) {
- @Override public List<GridQueryFieldMetadata> fieldsMeta() {
- if (cursor instanceof QueryCursorImpl)
- return ((QueryCursorEx)cursor).fieldsMeta();
+ cur = idx.queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel);
+ }
- return super.fieldsMeta();
- }
- };
+ sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
+
+ return cur;
}
};
}
else {
clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
@Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
- return idx.queryDistributedSqlFields(cctx, qry, keepBinary, null);
+ return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId);
}
};
}
@@ -1782,6 +1780,58 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Query SQL fields without strict dependency on concrete cache.
+ *
+ * @param schemaName Schema name.
+ * @param qry Query.
+ * @param keepBinary Keep binary flag.
+ * @return Cursot.
+ */
+ public FieldsQueryCursor<List<?>> querySqlFieldsNoCache(final String schemaName, final SqlFieldsQuery qry,
+ final boolean keepBinary) {
+ checkxEnabled();
+
+ validateSqlFieldsQuery(qry);
+
+ if (qry.isLocal())
+ throw new IgniteException("Local query is not supported without specific cache.");
+
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+ try {
+ IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
+ @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
+ GridQueryCancel cancel = new GridQueryCancel();
+
+ return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, null);
+ }
+ };
+
+ return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), null, clo, true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validate SQL fields query.
+ *
+ * @param qry Query.
+ */
+ private static void validateSqlFieldsQuery(SqlFieldsQuery qry) {
+ if (qry.isReplicatedOnly() && qry.getPartitions() != null)
+ throw new CacheException("Partitions are not supported in replicated only mode.");
+
+ if (qry.isDistributedJoins() && qry.getPartitions() != null)
+ throw new CacheException("Using both partitions and distributed JOINs is not supported for the same query");
+ }
+
+ /**
* @param cacheName Cache name.
* @param streamer Data streamer.
* @param qry Query.
@@ -1797,9 +1847,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
GridCacheContext cctx = ctx.cache().cache(cacheName).context();
+ final String schemaName = idx.schema(cacheName);
+
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
@Override public Long applyx() throws IgniteCheckedException {
- return idx.streamUpdateQuery(cacheName, qry, args, streamer);
+ return idx.streamUpdateQuery(schemaName, qry, args, streamer);
}
}, true);
}
@@ -1848,10 +1900,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
+ final String schemaName = idx.schema(cctx.name());
+ final int mainCacheId = CU.cacheId(cctx.name());
+
return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
@Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
- return idx.queryDistributedSql(cctx, qry, keepBinary);
+ return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId);
}
}, true);
}
@@ -1874,6 +1929,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
+ final String schemaName = idx.schema(cctx.name());
+ final int mainCacheId = CU.cacheId(cctx.name());
+
try {
return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
@@ -1889,8 +1947,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
qry.getArgs(),
cctx.name());
- return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()),
- keepBinary);
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
+
+ return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId);
+ }
+ else
+ return idx.queryLocalSql(schemaName, qry, idx.backupFilter(requestTopVer.get(),
+ qry.getPartitions()), keepBinary);
}
}, true);
}
@@ -2036,7 +2100,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
checkxEnabled();
- return idx.prepareNativeStatement(cacheName, sql);
+ String schemaName = idx.schema(cacheName);
+
+ return idx.prepareNativeStatement(schemaName, sql);
}
/**
@@ -2051,21 +2117,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name.
- * @param nativeStmt Native statement.
- * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
- * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
- * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
- * @param allowOverwrite Overwrite existing cache values on key duplication.
- * @see IgniteDataStreamer#allowOverwrite
- * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata.
- */
- public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq,
- int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
- return idx.createStreamer(cacheName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite);
- }
-
- /**
- * @param cacheName Cache name.
* @param key Key.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@@ -2122,8 +2173,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
@Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
String typeName = typeName(cacheName, resType);
+ String schemaName = idx.schema(cacheName);
- return idx.queryLocalText(cacheName, clause, typeName, filters);
+ return idx.queryLocalText(schemaName, clause, typeName, filters);
}
}, true);
}
@@ -2191,7 +2243,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param complete Complete.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public <R> R executeQuery(GridCacheQueryType qryType, String qry, GridCacheContext<?, ?> cctx,
+ public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable GridCacheContext<?, ?> cctx,
IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException {
final long startTime = U.currentTimeMillis();
@@ -2231,7 +2283,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
long duration = U.currentTimeMillis() - startTime;
if (complete || failed) {
- cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed);
+ if (cctx != null)
+ cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed);
if (log.isTraceEnabled())
log.trace("Query execution [startTime=" + startTime + ", duration=" + duration +
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index d48c373..98d123f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
@@ -138,7 +139,7 @@ public class DmlStatementsProcessor {
/**
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
- * @param schema Schema.
+ * @param schemaName Schema.
* @param stmt JDBC statement.
* @param fieldsQry Original query.
* @param loc Query locality flag.
@@ -147,13 +148,13 @@ public class DmlStatementsProcessor {
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
- private UpdateResult updateSqlFields(String schema, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+ private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
- UpdatePlan plan = getPlanForStatement(schema, stmt, null);
+ UpdatePlan plan = getPlanForStatement(schemaName, stmt, null);
GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
@@ -177,7 +178,7 @@ public class DmlStatementsProcessor {
UpdateResult r;
try {
- r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
+ r = executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
}
finally {
cctx.operationContextPerCall(opCtx);
@@ -201,7 +202,7 @@ public class DmlStatementsProcessor {
}
/**
- * @param schema Schema.
+ * @param schemaName Schema.
* @param stmt Prepared statement.
* @param fieldsQry Initial query
* @param cancel Query cancel.
@@ -209,12 +210,12 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schema, PreparedStatement stmt,
+ QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, false, null, cancel);
+ UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel);
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(res.cnt)), null, false);
+ (Collections.singletonList(res.cnt)), cancel, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
@@ -224,7 +225,7 @@ public class DmlStatementsProcessor {
/**
* Execute DML statement on local cache.
*
- * @param schema Schema.
+ * @param schemaName Schema.
* @param stmt Prepared statement.
* @param fieldsQry Fields query.
* @param filters Cache name and key filter.
@@ -233,10 +234,10 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
- GridQueryFieldsResult updateSqlFieldsLocal(String schema, PreparedStatement stmt,
+ GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt,
SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, true, filters, cancel);
+ UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel);
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
@@ -276,8 +277,8 @@ public class DmlStatementsProcessor {
final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum);
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(args),
- null, false, 0, null);
+ final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry,
+ F.asList(args), null, false, 0, null);
QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
@@ -327,6 +328,7 @@ public class DmlStatementsProcessor {
/**
* Actually perform SQL DML operation locally.
*
+ * @param schemaName Schema name.
* @param cctx Cache context.
* @param prepStmt Prepared statement for DML query.
* @param fieldsQry Fields query.
@@ -336,12 +338,14 @@ public class DmlStatementsProcessor {
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"ConstantConditions", "unchecked"})
- private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
- SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel,
- Object[] failedKeys) throws IgniteCheckedException {
+ private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx,
+ PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
+ GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
+ int mainCacheId = CU.cacheId(cctx.name());
+
Integer errKeysPos = null;
- UpdatePlan plan = getPlanForStatement(idx.schema(cctx.name()), prepStmt, errKeysPos);
+ UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos);
if (plan.fastUpdateArgs != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -364,16 +368,17 @@ public class DmlStatementsProcessor {
.setPageSize(fieldsQry.getPageSize())
.setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
- cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(cctx, newFieldsQry, true, cancel);
+ cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel,
+ mainCacheId);
}
else {
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry,
+ final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry,
F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
- return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), cctx.keepBinary());
+ return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1e19954..bd611f6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -398,35 +399,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
- String schemaName = schema(cacheName);
-
- return prepareStatement(connectionForSchema(schemaName), sql, true);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt,
- long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
- Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
-
- if (!(prep instanceof Insert))
- throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
- IgniteDataStreamer streamer = ctx.grid().dataStreamer(cacheName);
-
- streamer.autoFlushFrequency(autoFlushFreq);
-
- streamer.allowOverwrite(allowOverwrite);
-
- if (nodeBufSize > 0)
- streamer.perNodeBufferSize(nodeBufSize);
-
- if (nodeParOps > 0)
- streamer.perNodeParallelOperations(nodeParOps);
+ @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException {
+ Connection conn = connectionForSchema(schemaName);
- return streamer;
+ return prepareStatement(conn, sql, true);
}
/**
@@ -564,7 +540,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridCacheVersion ver,
long expirationTime,
long link) throws IgniteCheckedException {
- H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+ H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name());
if (tbl == null)
return; // Type was rejected.
@@ -588,7 +564,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']');
- H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+ H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name());
if (tbl == null)
return;
@@ -782,13 +758,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
@SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(
- String cacheName, String qry, String typeName,
- IndexingQueryFilter filters) throws IgniteCheckedException {
- H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+ @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry,
+ String typeName, IndexingQueryFilter filters) throws IgniteCheckedException {
+ H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
- GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, cacheName,
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, schemaName,
U.currentTimeMillis(), null, true);
try {
@@ -805,9 +780,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void unregisterType(String cacheName, String typeName)
+ @Override public void unregisterType(String schemaName, String typeName)
throws IgniteCheckedException {
- H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+ H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
if (tbl != null)
removeTable(tbl);
@@ -816,7 +791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param qry Query.
* @param params Query parameters.
* @param filter Cache name and key filter.
@@ -827,12 +802,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- public GridQueryFieldsResult queryLocalSqlFields(final String cacheName, final String qry,
+ public GridQueryFieldsResult queryLocalSqlFields(final String schemaName, final String qry,
@Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException {
- final String schema = schema(cacheName);
-
- final Connection conn = connectionForSchema(schema);
+ final Connection conn = connectionForSchema(schemaName);
H2Utils.setupConnection(conn, false, enforceJoinOrder);
@@ -849,7 +822,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
- return dmlProc.updateSqlFieldsLocal(schema, stmt, fldsQry, filter, cancel);
+ return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel);
}
else if (DdlStatementsProcessor.isDdlStatement(p))
throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
@@ -874,12 +847,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(ctx);
GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
- cacheName, U.currentTimeMillis(), cancel, true);
+ schemaName, U.currentTimeMillis(), cancel, true);
runs.putIfAbsent(run.id(), run);
try {
- ResultSet rs = executeSqlQueryWithTimer(schema, stmt, conn, qry, params, timeout, cancel);
+ ResultSet rs = executeSqlQueryWithTimer(schemaName, stmt, conn, qry, params, timeout, cancel);
return new H2FieldsIterator(rs);
}
@@ -893,10 +866,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public long streamUpdateQuery(String cacheName, String qry,
+ @Override public long streamUpdateQuery(String schemaName, String qry,
@Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
- String schemaName = schema(cacheName);
-
final Connection conn = connectionForSchema(schemaName);
final PreparedStatement stmt;
@@ -1074,97 +1045,76 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
- final SqlFieldsQuery qry, final boolean keepBinary, final IndexingQueryFilter filter,
- final GridQueryCancel cancel) throws IgniteCheckedException {
-
- if (cctx.config().getQueryParallelism() > 1) {
- qry.setDistributedJoins(true);
+ @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
+ final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+ String sql = qry.getSql();
+ Object[] args = qry.getArgs();
- assert qry.isLocal();
+ final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, F.asList(args), filter,
+ qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
- return queryDistributedSqlFields(cctx, qry, keepBinary, cancel);
- }
- else {
- final String cacheName = cctx.name();
- final String sql = qry.getSql();
- final Object[] args = qry.getArgs();
-
- final GridQueryFieldsResult res = queryLocalSqlFields(cacheName, sql, F.asList(args), filter,
- qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
-
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- try {
- return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
- }, cancel);
+ }
+ }, cancel);
- cursor.fieldsMeta(res.metaData());
+ cursor.fieldsMeta(res.metaData());
- return cursor;
- }
+ return cursor;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+ @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName,
final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
- if (cctx.config().getQueryParallelism() > 1) {
- qry.setDistributedJoins(true);
+ String type = qry.getType();
+ String sqlQry = qry.getSql();
+ String alias = qry.getAlias();
+ Object[] params = qry.getArgs();
- assert qry.isLocal();
+ GridQueryCancel cancel = new GridQueryCancel();
- return queryDistributedSql(cctx, qry, keepBinary);
- }
- else {
- String cacheName = cctx.name();
- String type = qry.getType();
- String sqlQry = qry.getSql();
- String alias = qry.getAlias();
- Object[] params = qry.getArgs();
-
- GridQueryCancel cancel = new GridQueryCancel();
-
- final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(cacheName, sqlQry, alias,
- F.asList(params), type, filter, cancel);
-
- return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
- @Override public Iterator<Cache.Entry<K, V>> iterator() {
- return new ClIter<Cache.Entry<K, V>>() {
- @Override public void close() throws Exception {
- i.close();
- }
+ final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(schemaName, sqlQry, alias,
+ F.asList(params), type, filter, cancel);
- @Override public boolean hasNext() {
- return i.hasNext();
- }
+ return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return new ClIter<Cache.Entry<K, V>>() {
+ @Override public void close() throws Exception {
+ i.close();
+ }
- @Override public Cache.Entry<K, V> next() {
- IgniteBiTuple<K, V> t = i.next();
+ @Override public boolean hasNext() {
+ return i.hasNext();
+ }
- return new CacheEntryImpl<>(
- (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
- (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
- }
+ @Override public Cache.Entry<K, V> next() {
+ IgniteBiTuple<K, V> t = i.next();
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }, cancel);
- }
+ K key = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get1(), keepBinary, false);
+ V val = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get2(), keepBinary, false);
+
+ return new CacheEntryImpl<>(key, val);
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }, cancel);
}
/**
* Executes regular query.
*
- * @param cacheName Cache name.
+ * @param schemaName Schema name.
* @param qry Query.
* @param alias Table alias.
* @param params Query parameters.
@@ -1174,10 +1124,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String cacheName,
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String schemaName,
final String qry, String alias, @Nullable final Collection<Object> params, String type,
final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
- final H2TableDescriptor tbl = tableDescriptor(type, cacheName);
+ final H2TableDescriptor tbl = tableDescriptor(schemaName, type);
if (tbl == null)
throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1192,13 +1142,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
.distributedJoinMode(OFF));
- GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, cacheName,
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, schemaName,
U.currentTimeMillis(), null, true);
runs.put(run.id(), run);
try {
- ResultSet rs = executeSqlQueryWithTimer(schema(cacheName), conn, sql, params, true, 0, cancel);
+ ResultSet rs = executeSqlQueryWithTimer(schemaName, conn, sql, params, true, 0, cancel);
return new H2KeyValueIterator(rs);
}
@@ -1237,12 +1187,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?, ?> cctx,
- SqlQuery qry, boolean keepBinary) {
+ @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry,
+ boolean keepBinary, int mainCacheId) {
String type = qry.getType();
- String cacheName = cctx.name();
- H2TableDescriptor tblDesc = tableDescriptor(type, cacheName);
+ H2TableDescriptor tblDesc = tableDescriptor(schemaName, type);
if (tblDesc == null)
throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1268,7 +1217,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (qry.getTimeout() > 0)
fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
- final QueryCursor<List<?>> res = queryDistributedSqlFields(cctx, fqry, keepBinary, null);
+ final QueryCursor<List<?>> res = queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId);
final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
@Override public Iterator<Cache.Entry<K, V>> iterator() {
@@ -1301,12 +1250,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx,
- SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel) {
+ @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName,
+ SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) {
final String sqlQry = qry.getSql();
- String schemaName = schema(cctx.name());
-
Connection c = connectionForSchema(schemaName);
final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
@@ -1413,14 +1360,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
- // Setup caches from schemas.
assert twoStepQry != null;
int tblCnt = twoStepQry.tablesCount();
- if (tblCnt > 0) {
- caches0.add(cctx.cacheId());
+ if (mainCacheId != null)
+ caches0.add(mainCacheId);
+ if (tblCnt > 0) {
for (QueryTable tblKey : twoStepQry.tables()) {
GridH2Table tbl = dataTable(tblKey);
@@ -1429,8 +1376,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
caches0.add(cacheId);
}
}
- else
- caches0.add(cctx.cacheId());
+
+ if (caches0.isEmpty())
+ throw new IgniteSQLException("Failed to find at least one cache for SQL statement: " + sqlQry);
//Prohibit usage indices with different numbers of segments in same query.
List<Integer> cacheIds = new ArrayList<>(caches0);
@@ -1470,6 +1418,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (cachedQry == null && !twoStepQry.explain()) {
cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy());
+
twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
}
@@ -1732,30 +1681,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Gets table descriptor by type and cache names.
+ * Get table descriptor.
*
+ * @param schemaName Schema name.
* @param type Type name.
- * @param cacheName Cache name.
- * @return Table descriptor.
+ * @return Descriptor.
*/
- @Nullable private H2TableDescriptor tableDescriptor(String type, String cacheName) {
- String schemaName = schema(cacheName);
-
+ @Nullable private H2TableDescriptor tableDescriptor(String schemaName, String type) {
H2Schema schema = schemas.get(schemaName);
if (schema == null)
return null;
return schema.tableByTypeName(type);
- }
+ };
- /**
- * Gets database schema from cache name.
- *
- * @param cacheName Cache name. {@code null} would be converted to an empty string.
- * @return Schema name. Should not be null since we should not fail for an invalid cache name.
- */
- public String schema(String cacheName) {
+
+ /** {@inheritDoc} */
+ @Override public String schema(String cacheName) {
String res = cacheName2schema.get(cacheName);
if (res == null)
@@ -1764,6 +1707,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return res;
}
+ /** {@inheritDoc} */
+ @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
+ Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
+
+ return prep instanceof Insert;
+ }
+
/**
* Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}.
*/
@@ -1792,17 +1742,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return schema.cacheName();
}
- /**
- * Rebuild indexes from hash index.
- *
- * @param cacheName Cache name.
- * @param type Type descriptor.
- * @throws IgniteCheckedException If failed.
- */
+ /** {@inheritDoc} */
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Override public void rebuildIndexesFromHash(String cacheName,
- GridQueryTypeDescriptor type) throws IgniteCheckedException {
- H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+ @Override public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName)
+ throws IgniteCheckedException {
+ H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
if (tbl == null)
return;
@@ -1815,10 +1759,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Cursor cursor = hashIdx.find((Session)null, null, null);
- int cacheId = CU.cacheId(cacheName);
-
- GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
-
while (cursor.next()) {
CacheDataRow dataRow = (CacheDataRow)cursor.get();
@@ -1859,8 +1799,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type) {
- H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+ @Override public void markForRebuildFromHash(String schemaName, String typeName) {
+ H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
if (tbl == null)
return;
@@ -1871,40 +1811,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Gets size (for tests only).
- *
- * @param cacheName Cache name.
- * @param typeName Type name.
- * @return Size.
- * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown.
- */
- long size(String cacheName, String typeName) throws IgniteCheckedException {
- String schemaName = schema(cacheName);
-
- H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
-
- if (tbl == null)
- return -1;
-
- Connection conn = connectionForSchema(schemaName);
-
- H2Utils.setupConnection(conn, false, false);
-
- try {
- ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(),
- false), 0, null);
-
- if (!rs.next())
- throw new IllegalStateException();
-
- return rs.getLong(1);
- }
- catch (SQLException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
* @return Busy lock.
*/
public GridSpinBusyLock busyLock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 73a7191..7b0cbf8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -247,33 +247,25 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
public void testSpi() throws Exception {
IgniteH2Indexing spi = getIndexing();
- assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name()));
-
IgniteCache<Integer, BinaryObject> cacheA = ignite0.createCache(cacheACfg());
- assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name()));
-
IgniteCache<Integer, BinaryObject> cacheB = ignite0.createCache(cacheBCfg());
- // Initially all is empty.
- assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
+ assertFalse(spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select * from A.A", null, Collections.emptySet(),
+ typeAA.name(), null, null).hasNext());
- assertFalse(spi.queryLocalSql(typeAA.cacheName(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
- assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
- assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
+ assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select * from A.B", null, Collections.emptySet(),
+ typeAB.name(), null, null).hasNext());
- assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A, A.B, A.A", null,
+ assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A", null, Collections.emptySet(),
+ typeBA.name(), null, null).hasNext());
+
+ assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A, A.B, A.A", null,
Collections.emptySet(), typeBA.name(), null, null).hasNext());
try {
- spi.queryLocalSql(typeBA.cacheName(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
- Collections.emptySet(), typeBA.name(), null, null).hasNext();
+ spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba",
+ null, Collections.emptySet(), typeBA.name(), null, null).hasNext();
fail("Enumerations of aliases in select block must be prohibited");
}
@@ -281,60 +273,23 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
// all fine
}
- assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select ab.* from A.B ab", null,
+ assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select ab.* from A.B ab", null,
Collections.emptySet(), typeAB.name(), null, null).hasNext());
- assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select ba.* from B.A as ba", null,
+ assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select ba.* from B.A as ba", null,
Collections.emptySet(), typeBA.name(), null, null).hasNext());
cacheA.put(1, aa("A", 1, "Vasya", 10).build());
-
- assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheA.put(1, ab(1, "Vasya", 20, "Some text about Vasya goes here.").build());
-
- // In one cache all keys must be unique.
- assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheB.put(1, ba(2, "Petya", 25, true).build());
-
- // No replacement because of different cache.
- assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheB.put(1, ba(2, "Kolya", 25, true).build());
-
- // Replacement in the same table.
- assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheA.put(2, aa("A", 2, "Valera", 19).build());
-
- assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheA.put(3, aa("A", 3, "Borya", 18).build());
-
- assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheA.put(4, ab(4, "Vitalya", 20, "Very Good guy").build());
- assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
// Query data.
- Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res =
- spi.queryLocalSql(typeAA.cacheName(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
+ Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res = spi.queryLocalSql(spi.schema(typeAA.cacheName()),
+ "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
assertEquals(aa("A", 3, "Borya", 18).build(), value(res.next()));
@@ -342,7 +297,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next()));
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeAA.cacheName(), "select aa.* from a aa order by aa.age", null,
+ res = spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select aa.* from a aa order by aa.age", null,
Collections.emptySet(), typeAA.name(), null, null);
assertTrue(res.hasNext());
@@ -351,7 +306,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next()));
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeAB.cacheName(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
+ res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "from b order by name", null, Collections.emptySet(),
+ typeAB.name(), null, null);
assertTrue(res.hasNext());
assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").build(), value(res.next()));
@@ -359,7 +315,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next()));
assertFalse(res.hasNext());
- res = spi.queryLocalSql(typeAB.cacheName(), "select bb.* from b as bb order by bb.name", null,
+ res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select bb.* from b as bb order by bb.name", null,
Collections.emptySet(), typeAB.name(), null, null);
assertTrue(res.hasNext());
@@ -368,16 +324,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next()));
assertFalse(res.hasNext());
-
- res = spi.queryLocalSql(typeBA.cacheName(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
+ res = spi.queryLocalSql(spi.schema(typeBA.cacheName()), "from a", null, Collections.emptySet(), typeBA.name(),
+ null, null);
assertTrue(res.hasNext());
assertEquals(ba(2, "Kolya", 25, true).build(), value(res.next()));
assertFalse(res.hasNext());
// Text queries
- Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(typeAB.cacheName(), "good",
- typeAB.name(), null);
+ Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(spi.schema(typeAB.cacheName()),
+ "good", typeAB.name(), null);
assertTrue(txtRes.hasNext());
assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(txtRes.next()));
@@ -385,7 +341,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
// Fields query
GridQueryFieldsResult fieldsRes =
- spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
+ spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " +
"b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null);
String[] aliases = {"N1", "A1", "N2", "A2"};
@@ -410,33 +366,12 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
// Remove
cacheA.remove(2);
-
- assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
cacheB.remove(1);
- assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
// Unregister.
- spi.unregisterType(typeAA.cacheName(), typeAA.name());
-
- assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
- spi.unregisterType(typeAB.cacheName(), typeAB.name());
-
- assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
- assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name()));
- assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
- spi.unregisterType(typeBA.cacheName(), typeBA.name());
-
- assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
+ spi.unregisterType(spi.schema(typeAA.cacheName()), typeAA.name());
+ spi.unregisterType(spi.schema(typeAB.cacheName()), typeAB.name());
+ spi.unregisterType(spi.schema(typeBA.cacheName()), typeBA.name());
}
/**
@@ -469,8 +404,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
time = now;
range *= 3;
- GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range),
- null, false, 0, null);
+ GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1,
+ range), null, false, 0, null);
assert res.iterator().hasNext();