You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/18 18:46:00 UTC
[03/11] incubator-ignite git commit: Squashed commit for ignite-1239
Squashed commit for ignite-1239
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45c813af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45c813af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45c813af
Branch: refs/heads/ignite-gg-10606
Commit: 45c813af7eb4a11ec59d3477a6a0b68791f1d7f2
Parents: 7635e58
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Aug 17 16:51:41 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Aug 17 16:51:41 2015 +0300
----------------------------------------------------------------------
.../GridDhtUnreservedPartitionException.java | 66 ++++++++++++++
.../cache/query/GridCacheQueryAdapter.java | 56 ++++++++++--
.../cache/query/GridCacheQueryManager.java | 71 ++++++++++-----
...CacheScanPartitionQueryFallbackSelfTest.java | 96 ++++++++++++++++++++
4 files changed, 261 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
new file mode 100644
index 0000000..d824a47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
+
+/**
+ * Exception that is thrown when a partition reservation failed.
+ */
+public class GridDhtUnreservedPartitionException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Partition. */
+ private final int part;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param part Partition.
+ * @param topVer Affinity topology version.
+ * @param msg Message.
+ */
+ public GridDhtUnreservedPartitionException(int part, AffinityTopologyVersion topVer, String msg) {
+ super(msg);
+
+ this.part = part;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
+ * @return Affinity topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass() + " [part=" + part + ", msg=" + getMessage() + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 953cb9a..90f9b9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
else if (type == SCAN && part != null && nodes.size() > 1)
- return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
+ return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx);
else
return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
}
@@ -554,7 +555,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
/** Backups. */
- private final Queue<ClusterNode> nodes;
+ private volatile Queue<ClusterNode> nodes;
+
+ /** Topology version of the last detected {@link GridDhtUnreservedPartitionException}. */
+ private volatile AffinityTopologyVersion unreservedTopVer;
+
+ /** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */
+ private volatile int unreservedNodesRetryCnt = 5;
/** Bean. */
private final GridCacheQueryBean bean;
@@ -562,16 +569,26 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** Query manager. */
private final GridCacheQueryManager qryMgr;
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Partition. */
+ private final int part;
+
/**
* @param nodes Backups.
+ * @param part Partition.
* @param bean Bean.
* @param qryMgr Query manager.
+ * @param cctx Cache context.
*/
- public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean,
- GridCacheQueryManager qryMgr) {
+ public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean,
+ GridCacheQueryManager qryMgr, GridCacheContext cctx) {
this.nodes = fallbacks(nodes);
this.bean = bean;
this.qryMgr = qryMgr;
+ this.cctx = cctx;
+ this.part = part;
init();
}
@@ -598,7 +615,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
*/
@SuppressWarnings("unchecked")
private void init() {
- ClusterNode node = nodes.poll();
+ final ClusterNode node = nodes.poll();
GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
qryMgr.queryLocal(bean) :
@@ -613,8 +630,33 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
onDone(e);
}
catch (IgniteCheckedException e) {
- if (F.isEmpty(nodes))
- onDone(e);
+ if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
+ unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+
+ assert unreservedTopVer != null;
+ }
+
+ if (F.isEmpty(nodes)) {
+ final AffinityTopologyVersion topVer = unreservedTopVer;
+
+ if (topVer != null && --unreservedNodesRetryCnt > 0) {
+ cctx.affinity().affinityReadyFuture(topVer).listen(
+ new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(
+ IgniteInternalFuture<AffinityTopologyVersion> future) {
+
+ nodes = fallbacks(cctx.topology().owners(part, topVer));
+
+ // Race is impossible here because query retries are executed one by one.
+ unreservedTopVer = null;
+
+ init();
+ }
+ });
+ }
+ else
+ onDone(e);
+ }
else
init();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 5d3f6a3..bfe5ecc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
+
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -170,7 +171,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Leaves busy state.
+ * Leaves busy state.
*/
private void leaveBusy() {
busyLock.leaveBusy();
@@ -794,7 +795,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
// double check for owning state
if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
locPart.state() != OWNING)
- throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
+ throw new GridDhtUnreservedPartitionException(part,
+ cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
iter = new Iterator<K>() {
private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
@@ -1083,6 +1085,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
boolean rmvRes = true;
+ FieldsResult res = null;
+
final boolean statsEnabled = cctx.config().isStatisticsEnabled();
final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
@@ -1109,7 +1113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
- FieldsResult res = qryInfo.local() ?
+ res = qryInfo.local() ?
executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName,
recipient(qryInfo.senderId(), qryInfo.requestId())) :
fieldsQueryResult(qryInfo, taskName);
@@ -1232,7 +1236,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw (Error)e;
}
finally {
- if (rmvRes)
+ if (qryInfo.local()) {
+ // Don't we need to always remove local iterators?
+ if (rmvRes && res != null) {
+ try {
+ res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+ cctx.nodeId() + "]", e);
+ }
+ }
+ }
+ else if (rmvRes)
removeFieldsQueryResult(qryInfo.senderId(), qryInfo.requestId());
}
}
@@ -1260,6 +1276,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
boolean loc = qryInfo.local();
+ QueryResult<K, V> res = null;
+
if (log.isDebugEnabled())
log.debug("Running query: " + qryInfo);
@@ -1286,8 +1304,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> iter;
GridCacheQueryType type;
- QueryResult<K, V> res;
-
res = loc ?
executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
recipient(qryInfo.senderId(), qryInfo.requestId())) :
@@ -1350,7 +1366,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
log.debug("Record [key=" + key +
", val=" + val +
", incBackups=" + incBackups +
- ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) +
+ ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) +
", node=" + U.id8(cctx.localNode().id()) + ']');
}
@@ -1496,7 +1512,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw (Error)e;
}
finally {
- if (rmvIter)
+ if (loc) {
+ // Local iterators are always removed.
+ if (res != null) {
+ try {
+ res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+ cctx.nodeId() + "]", e);
+ }
+ }
+ }
+ else if (rmvIter)
removeQueryResult(qryInfo.senderId(), qryInfo.requestId());
}
}
@@ -1552,7 +1580,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
- @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
+ @SuppressWarnings({
+ "SynchronizationOnLocalVariableOrMethodParameter",
"NonPrivateFieldAccessedInSynchronizedContext"})
private QueryResult<K, V> queryResult(Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs,
GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
@@ -1680,7 +1709,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Fields query result.
* @throws IgniteCheckedException In case of error.
*/
- @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
+ @SuppressWarnings({
+ "SynchronizationOnLocalVariableOrMethodParameter",
"NonPrivateFieldAccessedInSynchronizedContext"})
private FieldsResult fieldsQueryResult(Map<Long, GridFutureAdapter<FieldsResult>> resMap,
GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
@@ -1868,8 +1898,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* @param <K> Key type.
* @param <V> Value type.
- * @return Predicate.
* @param includeBackups Include backups.
+ * @return Predicate.
*/
@SuppressWarnings("unchecked")
@Nullable public <K, V> IndexingQueryFilter backupsFilter(boolean includeBackups) {
@@ -1933,7 +1963,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** {@inheritDoc} */
@Override public Collection<CacheSqlMetadata> call() {
- final GridKernalContext ctx = ((IgniteKernal) ignite).context();
+ final GridKernalContext ctx = ((IgniteKernal)ignite).context();
Collection<String> cacheNames = F.viewReadOnly(ctx.cache().caches(),
new C1<IgniteInternalCache<?, ?>, String>() {
@@ -2507,7 +2537,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (!filter.apply(key, val))
return null;
- return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ;
+ return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
}
}
@@ -2546,7 +2576,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
idx++;
- while(idx < iters.size()) {
+ while (idx < iters.size()) {
iter = iters.get(idx);
if (iter.hasNextX())
@@ -2598,7 +2628,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/** {@inheritDoc} */
@Override public <T> T unwrap(Class<T> clazz) {
- if(clazz.isAssignableFrom(getClass()))
+ if (clazz.isAssignableFrom(getClass()))
return clazz.cast(this);
throw new IllegalArgumentException();
@@ -2627,7 +2657,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
assert res;
}
-
/**
* Close if this result does not have any other recipients.
*
@@ -2958,8 +2987,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Creates user's full text query, queried class, and query clause.
- * For more information refer to {@link CacheQuery} documentation.
+ * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
+ * documentation.
*
* @param clsName Query class name.
* @param search Search clause.
@@ -2982,14 +3011,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Creates user's SQL fields query for given clause. For more information refer to
- * {@link CacheQuery} documentation.
+ * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
+ * documentation.
*
* @param qry Query.
* @param keepPortable Keep portable flag.
* @return Created query.
*/
- public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
+ public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
A.notNull(qry, "qry");
return new GridCacheQueryAdapter<>(cctx,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 84ceafd..f422e9c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -157,6 +159,90 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
}
/**
+ * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
+ * scan query.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testScanFallbackOnRebalancing() throws Exception {
+ cacheMode = CacheMode.PARTITIONED;
+ clientMode = false;
+ backups = 1;
+ commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+
+ try {
+ Ignite ignite = startGrids(GRID_CNT);
+
+ final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ final AtomicInteger idx = new AtomicInteger(GRID_CNT);
+
+ IgniteInternalFuture fut1 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int id = idx.getAndIncrement();
+
+ while (!done.get()) {
+ startGrid(id);
+ Thread.sleep(3000);
+
+ stopGrid(id);
+
+ if (done.get())
+ return null;
+
+ Thread.sleep(3000);
+ }
+
+ return null;
+ }
+ }, GRID_CNT);
+
+ final AtomicInteger nodeIdx = new AtomicInteger();
+
+ IgniteInternalFuture fut2 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int nodeId = nodeIdx.getAndIncrement();
+
+ IgniteCacheProxy<Integer, Integer> cache = (IgniteCacheProxy<Integer, Integer>)
+ grid(nodeId).<Integer, Integer>cache(null);
+
+ while (!done.get()) {
+ IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+
+ int part = tup.get1();
+
+ try {
+ CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(
+ null, part, false);
+
+ doTestScanQuery(qry);
+ }
+ catch (ClusterGroupEmptyCheckedException e) {
+ log.warning("Invalid partition: " + part, e);
+ }
+ }
+
+ return null;
+ }
+ }, GRID_CNT);
+
+ Thread.sleep(60 * 1000); // Test for one minute
+
+ done.set(true);
+
+ fut2.get();
+ fut1.get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* Scan should try first remote node and fallbacks to second remote node.
*
* @throws Exception If failed.
@@ -408,4 +494,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
};
}
}
+
+ /**
+ *
+ */
+ private static class TestFallbackOnRebalancingCommunicationSpiFactory implements CommunicationSpiFactory {
+ /** {@inheritDoc} */
+ @Override public TcpCommunicationSpi create() {
+ return new TcpCommunicationSpi();
+ }
+ }
}