You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/29 09:30:42 UTC
[16/24] ignite git commit: IGNITE-5311: Added ability to get
CacheObject value without CacheObjectContext. This closes #2019.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a31263f..8d9d953 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.CacheException;
@@ -100,7 +99,6 @@ import org.jsr166.ConcurrentHashMap8;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -132,7 +130,7 @@ public class GridReduceQueryExecutor {
private final AtomicLong qryIdGen;
/** */
- private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
/** */
private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
@@ -191,8 +189,8 @@ public class GridReduceQueryExecutor {
@Override public void onEvent(final Event evt) {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
- for (QueryRun r : runs.values()) {
- for (GridMergeIndex idx : r.idxs) {
+ for (ReduceQueryRun r : runs.values()) {
+ for (GridMergeIndex idx : r.indexes()) {
if (idx.hasSource(nodeId)) {
handleNodeLeft(r, nodeId);
@@ -208,7 +206,7 @@ public class GridReduceQueryExecutor {
* @param r Query run.
* @param nodeId Left node ID.
*/
- private void handleNodeLeft(QueryRun r, UUID nodeId) {
+ private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) {
// Will attempt to retry. If reduce query was started it will fail on next page fetching.
retry(r, h2.readyTopologyVersion(), nodeId);
}
@@ -248,7 +246,7 @@ public class GridReduceQueryExecutor {
* @param msg Message.
*/
private void onFail(ClusterNode node, GridQueryFailResponse msg) {
- QueryRun r = runs.get(msg.queryRequestId());
+ ReduceQueryRun r = runs.get(msg.queryRequestId());
fail(r, node.id(), msg.error(), msg.failCode());
}
@@ -258,7 +256,7 @@ public class GridReduceQueryExecutor {
* @param nodeId Failed node ID.
* @param msg Error message.
*/
- private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) {
+ private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
if (r != null) {
CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
@@ -278,21 +276,21 @@ public class GridReduceQueryExecutor {
final int qry = msg.query();
final int seg = msg.segmentId();
- final QueryRun r = runs.get(qryReqId);
+ final ReduceQueryRun r = runs.get(qryReqId);
if (r == null) // Already finished with error or canceled.
return;
- final int pageSize = r.pageSize;
+ final int pageSize = r.pageSize();
- GridMergeIndex idx = r.idxs.get(msg.query());
+ GridMergeIndex idx = r.indexes().get(msg.query());
GridResultPage page;
try {
page = new GridResultPage(ctx, node.id(), msg) {
@Override public void fetchNextPage() {
- Object errState = r.state.get();
+ Object errState = r.state();
if (errState != null) {
CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
@@ -335,7 +333,7 @@ public class GridReduceQueryExecutor {
if (msg.retry() != null)
retry(r, msg.retry(), node.id());
else if (msg.page() == 0) // Do count down on each first page received.
- r.latch.countDown();
+ r.latch().countDown();
}
/**
@@ -343,7 +341,7 @@ public class GridReduceQueryExecutor {
* @param retryVer Retry version.
* @param nodeId Node ID.
*/
- private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
+ private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
r.state(retryVer, nodeId);
}
@@ -501,7 +499,7 @@ public class GridReduceQueryExecutor {
}
/**
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepPortable Keep portable.
* @param enforceJoinOrder Enforce join order of tables.
@@ -512,7 +510,7 @@ public class GridReduceQueryExecutor {
* @return Rows iterator.
*/
public Iterator<List<?>> query(
- GridCacheContext<?, ?> cctx,
+ String schemaName,
GridCacheTwoStepQuery qry,
boolean keepPortable,
boolean enforceJoinOrder,
@@ -541,10 +539,8 @@ public class GridReduceQueryExecutor {
final long qryReqId = qryIdGen.incrementAndGet();
- final String cacheName = cctx.name();
-
- final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), cacheName,
- h2.connectionForCache(cacheName), qry.mapQueries().size(), qry.pageSize(),
+ final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
+ h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
U.currentTimeMillis(), cancel);
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -633,7 +629,7 @@ public class GridReduceQueryExecutor {
GridMergeTable tbl;
try {
- tbl = createMergeTable(r.conn, mapQry, qry.explain());
+ tbl = createMergeTable(r.connection(), mapQry, qry.explain());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -641,7 +637,7 @@ public class GridReduceQueryExecutor {
idx = tbl.getMergeIndex();
- fakeTable(r.conn, tblIdx++).innerTable(tbl);
+ fakeTable(r.connection(), tblIdx++).innerTable(tbl);
}
else
idx = GridMergeIndexUnsorted.createDummy(ctx);
@@ -659,13 +655,13 @@ public class GridReduceQueryExecutor {
else
idx.setSources(nodes, segmentsPerIndex);
- idx.setPageSize(r.pageSize);
+ idx.setPageSize(r.pageSize());
- r.idxs.add(idx);
+ r.indexes().add(idx);
}
- r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
- (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
+ r.latch(new CountDownLatch(isReplicatedOnly ? 1 :
+ (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt));
runs.put(qryReqId, r);
@@ -719,7 +715,7 @@ public class GridReduceQueryExecutor {
new GridH2QueryRequest()
.requestId(qryReqId)
.topologyVersion(topVer)
- .pageSize(r.pageSize)
+ .pageSize(r.pageSize())
.caches(qry.cacheIds())
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
@@ -731,7 +727,7 @@ public class GridReduceQueryExecutor {
false)) {
awaitAllReplies(r, nodes, cancel);
- Object state = r.state.get();
+ Object state = r.state();
if (state != null) {
if (state instanceof CacheException) {
@@ -764,7 +760,7 @@ public class GridReduceQueryExecutor {
List<List<?>> res = new ArrayList<>();
// Simple UNION ALL can have multiple indexes.
- for (GridMergeIndex idx : r.idxs) {
+ for (GridMergeIndex idx : r.indexes()) {
Cursor cur = idx.findInStream(null, null);
while (cur.next()) {
@@ -788,21 +784,19 @@ public class GridReduceQueryExecutor {
UUID locNodeId = ctx.localNodeId();
- H2Utils.setupConnection(r.conn, false, enforceJoinOrder);
+ H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize).distributedJoinMode(OFF));
+ .pageSize(r.pageSize()).distributedJoinMode(OFF));
try {
- String schema = h2.schema(cacheName);
-
if (qry.explain())
- return explainPlan(r.conn, schema, qry, params);
+ return explainPlan(r.connection(), schemaName, qry, params);
GridCacheSqlQuery rdc = qry.reduceQuery();
- ResultSet res = h2.executeSqlQueryWithTimer(schema,
- r.conn,
+ ResultSet res = h2.executeSqlQueryWithTimer(schemaName,
+ r.connection(),
rdc.query(),
F.asList(rdc.parameters(params)),
false, // The statement will cache some extra thread local objects.
@@ -824,10 +818,10 @@ public class GridReduceQueryExecutor {
continue;
}
- return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
+ return new GridQueryCacheObjectsIterator(resIter, h2.valueContext(), keepPortable);
}
catch (IgniteCheckedException | RuntimeException e) {
- U.closeQuiet(r.conn);
+ U.closeQuiet(r.connection());
if (e instanceof CacheException) {
if (wasCancelled((CacheException)e))
@@ -898,7 +892,7 @@ public class GridReduceQueryExecutor {
* @param distributedJoins Distributed join flag.
*/
private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
- QueryRun r,
+ ReduceQueryRun r,
long qryReqId,
boolean distributedJoins)
{
@@ -906,7 +900,7 @@ public class GridReduceQueryExecutor {
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
else {
- for (GridMergeIndex idx : r.idxs) {
+ for (GridMergeIndex idx : r.indexes()) {
if (!idx.fetchedAll()) {
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -922,9 +916,9 @@ public class GridReduceQueryExecutor {
* @param cancel Query cancel.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+ private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
throws IgniteInterruptedCheckedException, QueryCancelledException {
- while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+ while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) {
cancel.checkCancelled();
@@ -932,7 +926,7 @@ public class GridReduceQueryExecutor {
if (!ctx.discovery().alive(node)) {
handleNodeLeft(r, node.id());
- assert r.latch.getCount() == 0;
+ assert r.latch().getCount() == 0;
return;
}
@@ -1420,7 +1414,7 @@ public class GridReduceQueryExecutor {
CacheException err = new CacheException("Query was cancelled, client node disconnected.",
new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
- for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+ for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
e.getValue().disconnected(err);
}
@@ -1435,9 +1429,9 @@ public class GridReduceQueryExecutor {
long curTime = U.currentTimeMillis();
- for (QueryRun run : runs.values()) {
- if (run.qry.longQuery(curTime, duration))
- res.add(run.qry);
+ for (ReduceQueryRun run : runs.values()) {
+ if (run.queryInfo().longQuery(curTime, duration))
+ res.add(run.queryInfo());
}
return res;
@@ -1450,77 +1444,10 @@ public class GridReduceQueryExecutor {
*/
public void cancelQueries(Collection<Long> queries) {
for (Long qryId : queries) {
- QueryRun run = runs.get(qryId);
+ ReduceQueryRun run = runs.get(qryId);
if (run != null)
- run.qry.cancel();
- }
- }
-
- /**
- * Query run.
- */
- private static class QueryRun {
- /** */
- private final GridRunningQueryInfo qry;
-
- /** */
- private final List<GridMergeIndex> idxs;
-
- /** */
- private CountDownLatch latch;
-
- /** */
- private final JdbcConnection conn;
-
- /** */
- private final int pageSize;
-
- /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
- private final AtomicReference<Object> state = new AtomicReference<>();
-
- /**
- * @param id Query ID.
- * @param qry Query text.
- * @param cache Cache where query was executed.
- * @param conn Connection.
- * @param idxsCnt Number of indexes.
- * @param pageSize Page size.
- * @param startTime Start time.
- * @param cancel Query cancel handler.
- */
- private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
- this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
- this.conn = (JdbcConnection)conn;
- this.idxs = new ArrayList<>(idxsCnt);
- this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
- }
-
- /**
- * @param o Fail state object.
- * @param nodeId Node ID.
- */
- void state(Object o, @Nullable UUID nodeId) {
- assert o != null;
- assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
-
- if (!state.compareAndSet(null, o))
- return;
-
- while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
- latch.countDown();
-
- CacheException e = o instanceof CacheException ? (CacheException) o : null;
-
- for (GridMergeIndex idx : idxs) // Fail all merge indexes.
- idx.fail(nodeId, e);
- }
-
- /**
- * @param e Error.
- */
- void disconnected(CacheException e) {
- state(e, null);
+ run.queryInfo().cancel();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
new file mode 100644
index 0000000..73bb002
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.h2.jdbc.JdbcConnection;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.CacheException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
+/**
+ * Query run.
+ */
+class ReduceQueryRun {
+ /** */
+ private final GridRunningQueryInfo qry;
+
+ /** */
+ private final List<GridMergeIndex> idxs;
+
+ /** */
+ private CountDownLatch latch;
+
+ /** */
+ private final JdbcConnection conn;
+
+ /** */
+ private final int pageSize;
+
+ /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
+ private final AtomicReference<Object> state = new AtomicReference<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param schemaName Schema name.
+ * @param conn Connection.
+ * @param idxsCnt Number of indexes.
+ * @param pageSize Page size.
+ * @param startTime Start time.
+ * @param cancel Query cancel handler.
+ */
+ ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime,
+ GridQueryCancel cancel) {
+ this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, false);
+
+ this.conn = (JdbcConnection)conn;
+
+ this.idxs = new ArrayList<>(idxsCnt);
+
+ this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+ }
+
+ /**
+ * @param o Fail state object.
+ * @param nodeId Node ID.
+ */
+ void state(Object o, @Nullable UUID nodeId) {
+ assert o != null;
+ assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
+
+ if (!state.compareAndSet(null, o))
+ return;
+
+ while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+ latch.countDown();
+
+ CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
+ for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ idx.fail(nodeId, e);
+ }
+
+ /**
+ * @param e Error.
+ */
+ void disconnected(CacheException e) {
+ state(e, null);
+ }
+
+ /**
+ * @return Query info.
+ */
+ GridRunningQueryInfo queryInfo() {
+ return qry;
+ }
+
+ /**
+ * @return Page size.
+ */
+ int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return Connection.
+ */
+ JdbcConnection connection() {
+ return conn;
+ }
+
+ /**
+ * @return State.
+ */
+ Object state() {
+ return state.get();
+ }
+
+ /**
+ * @return Indexes.
+ */
+ List<GridMergeIndex> indexes() {
+ return idxs;
+ }
+
+ /**
+ * @return Latch.
+ */
+ CountDownLatch latch() {
+ return latch;
+ }
+
+ /**
+ * @param latch Latch.
+ */
+ void latch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 5ac02a5..1f73dcb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
@@ -726,7 +727,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return (T)val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 5939b59..b66a343 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -870,7 +870,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
IgniteH2Indexing idx = U.field(qryProcessor, "idx");
- return (JdbcConnection)idx.connectionForCache(DEFAULT_CACHE_NAME);
+ String schemaName = idx.schema(DEFAULT_CACHE_NAME);
+
+ return (JdbcConnection)idx.connectionForSchema(schemaName);
}
/**