You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:28 UTC
[01/16] ignite git commit: ignite-1.4 Fixed Visor cmd options.
Repository: ignite
Updated Branches:
refs/heads/ignite-1093-2 ff0e2e1a2 -> e5da2ca91
ignite-1.4 Fixed Visor cmd options.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cfb6e68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cfb6e68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cfb6e68
Branch: refs/heads/ignite-1093-2
Commit: 5cfb6e6878dea2fa78d7593766035a5b535763a1
Parents: cd6a1d5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Sep 17 16:24:25 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Sep 17 16:24:25 2015 +0700
----------------------------------------------------------------------
.../main/scala/org/apache/ignite/visor/commands/VisorConsole.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cfb6e68/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index 627f795..2abe8a7 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -91,7 +91,7 @@ class VisorConsole {
println(" -np - no pause on exit (pause by default).")
println(" -cfg=<path> - connect with specified configuration.")
println(" -b=<path> - batch mode with file.")
- println(" -e==cmd1;cmd2;... - batch mode with commands.")
+ println(" -e=cmd1;cmd2;... - batch mode with commands.")
visor.quit()
}
[13/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93caa0b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93caa0b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93caa0b8
Branch: refs/heads/ignite-1093-2
Commit: 93caa0b8bb9eb4f5f667d0533c15ce7a9efcdc17
Parents: 2cb397a
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 21:21:17 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 21:21:17 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 11 ++-
.../dht/preloader/GridDhtPartitionSupplier.java | 99 +++++++++-----------
2 files changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 596ec2f..a2f8c01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -827,7 +827,7 @@ public class GridDhtPartitionDemander {
lock.lock();
try {
- remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts));
+ remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts));
}
finally {
lock.unlock();
@@ -949,7 +949,8 @@ public class GridDhtPartitionDemander {
parts.remove(p);
if (parts.isEmpty()) {
- U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+ U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+ "rebalancing [cache=" + cctx.name() +
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
@@ -988,6 +989,9 @@ public class GridDhtPartitionDemander {
*/
private void checkIsDone() {
if (remaining.isEmpty()) {
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
if (log.isDebugEnabled())
log.debug("Completed sync future.");
@@ -1006,9 +1010,6 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
onDone();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 50d64f9..1d8572a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,12 +40,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T4;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -72,7 +70,7 @@ class GridDhtPartitionSupplier {
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
/** Supply context map. */
- private final ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<T4, SupplyContext> scMap = new ConcurrentHashMap8<>();
/** Rebalancing listener. */
private GridLocalEventListener lsnr;
@@ -98,19 +96,8 @@ class GridDhtPartitionSupplier {
void start() {
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- if (evt instanceof CacheRebalancingEvent) {
- ClusterNode node = ((CacheRebalancingEvent)evt).discoveryNode();
-
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
- for (int idx = 0; idx < lsnrCnt; idx++) {
- T2<UUID, Integer> scId = new T2<>(node.id(), idx);
-
- tryClearContext(scMap, scId, log);
- }
- }
- else if (evt instanceof DiscoveryEvent) {
- scMap.clear();
+ if (evt instanceof DiscoveryEvent) {
+ clearContexts(scMap, log, cctx);
}
else {
assert false;
@@ -118,7 +105,9 @@ class GridDhtPartitionSupplier {
}
};
- cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+ //todo: rebalance stopped.
+
+ cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
startOldListeners();
}
@@ -133,19 +122,33 @@ class GridDhtPartitionSupplier {
}
/**
- * Clear context by id.
+ * Clear contexts.
*
* @param map Context map.
- * @param scId Context id.
* @param log Logger.
+ * @param cctx Context.
*/
- private static void tryClearContext(
- ConcurrentHashMap8<T2, SupplyContext> map,
- T2<UUID, Integer> scId,
- IgniteLogger log) {
- SupplyContext sc = map.get(scId);
+ private static void clearContexts(
+ ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) {
+ for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) {
+ clearContext(map, entry.getKey(), entry.getValue(), log, cctx);
+ }
+ }
- if (sc != null) {
+ /**
+ * Clear context.
+ *
+ * @param map Context map.
+ * @param log Logger.
+ */
+ private static boolean clearContext(
+ ConcurrentHashMap8<T4, SupplyContext> map,
+ T4 t,
+ SupplyContext sc,
+ IgniteLogger log,
+ GridCacheContext<?, ?> cctx) {
+
+ if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) {
Iterator it = sc.entryIt;
if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
@@ -159,9 +162,11 @@ class GridDhtPartitionSupplier {
log.error("Iterator close failed.", e);
}
}
+
+ return map.remove(t, sc);
}
- map.remove(scId, sc);
+ return false;
}
/**
@@ -190,21 +195,16 @@ class GridDhtPartitionSupplier {
ClusterNode node = cctx.discovery().node(id);
- T2<UUID, Integer> scId = new T2<>(id, idx);
+ T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence());
try {
SupplyContext sctx = scMap.get(scId);
- if (sctx == null) {
- if (d.partitions().isEmpty())
- return;
- }
- else {
- if (!sctx.top.equals(d.topologyVersion())) {
- tryClearContext(scMap, scId, log);
+ if (sctx == null && d.partitions().isEmpty())
+ return;
- sctx = scMap.get(scId);
- }
+ if (sctx != null && !d.partitions().isEmpty()) {
+ assert false;
}
long bCnt = 0;
@@ -277,7 +277,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
swapLsnr = null;
@@ -321,8 +321,7 @@ class GridDhtPartitionSupplier {
partIt,
null,
swapLsnr,
- part,
- d.topologyVersion());
+ part);
}
}
@@ -353,7 +352,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
swapLsnr = null;
@@ -436,8 +435,7 @@ class GridDhtPartitionSupplier {
partIt,
null,
null,
- part,
- d.topologyVersion());
+ part);
}
}
@@ -465,7 +463,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
swapLsnr = null;
@@ -557,13 +555,12 @@ class GridDhtPartitionSupplier {
* @param swapLsnr Swap listener.
*/
private void saveSupplyContext(
- T2 t,
+ T4 t,
int phase,
Iterator<Integer> partIt,
int part,
- Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
- AffinityTopologyVersion top) {
- scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part, top));
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+ scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
}
/**
@@ -585,9 +582,6 @@ class GridDhtPartitionSupplier {
/** Partition. */
private final int part;
- /** Topology version. */
- private final AffinityTopologyVersion top;
-
/**
* @param phase Phase.
* @param partIt Partition iterator.
@@ -596,13 +590,12 @@ class GridDhtPartitionSupplier {
* @param part Partition.
*/
public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
- GridCacheEntryInfoCollectSwapListener swapLsnr, int part, AffinityTopologyVersion top) {
+ GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
this.phase = phase;
this.partIt = partIt;
this.entryIt = entryIt;
this.swapLsnr = swapLsnr;
this.part = part;
- this.top = top;
}
}
[03/16] ignite git commit: ignite-1452 Cancel cache operations on
node stop
Posted by sb...@apache.org.
ignite-1452 Cancel cache operations on node stop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/585761f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/585761f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/585761f2
Branch: refs/heads/ignite-1093-2
Commit: 585761f28e8b70487eaf2198d6ea39f7232b088d
Parents: b8c0b30
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 17 16:26:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 17 16:26:02 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 7 ---
.../processors/cache/GridCacheContext.java | 6 +--
.../cache/GridCacheEvictionManager.java | 6 +--
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 +++++++++++++-------
.../processors/cache/GridCacheMessage.java | 7 +++
.../processors/cache/GridCacheMvccManager.java | 34 +++++++++++---
.../GridCachePartitionExchangeManager.java | 41 +++++++++++++----
.../processors/cache/GridCacheProcessor.java | 28 ++++++++----
.../GridDistributedLockResponse.java | 6 +--
.../GridDistributedTxPrepareResponse.java | 6 +--
.../distributed/dht/GridDhtTopologyFuture.java | 6 ++-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +++--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++++---
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 ++---
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++++++++++++----
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +--
.../GridDhtPartitionsExchangeFuture.java | 19 ++++++--
.../distributed/near/GridNearGetResponse.java | 6 +--
.../distributed/near/GridNearLockFuture.java | 26 ++++++++---
.../near/GridNearOptimisticTxPrepareFuture.java | 20 +++++++--
.../near/GridNearTxFinishResponse.java | 6 +--
.../cache/query/GridCacheQueryResponse.java | 6 +--
.../continuous/CacheContinuousQueryHandler.java | 12 +++--
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 +++++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++++++---
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
32 files changed, 292 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index daf7d23..82db059 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1806,8 +1806,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP);
}
- GridCacheProcessor cacheProcessor = ctx.cache();
-
List<GridComponent> comps = ctx.components();
ctx.marshallerContext().onKernalStop();
@@ -1856,11 +1854,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Note that interrupted flag is cleared.
interrupted = true;
}
- finally {
- // Cleanup even on successful acquire.
- if (cacheProcessor != null)
- cacheProcessor.cancelUserOperations();
- }
}
if (interrupted)
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 86ba3e6..5385dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -283,12 +283,12 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheEvictionManager evictMgr,
GridCacheQueryManager<K, V> qryMgr,
CacheContinuousQueryManager contQryMgr,
- GridCacheAffinityManager affMgr,
CacheDataStructuresManager dataStructuresMgr,
GridCacheTtlManager ttlMgr,
GridCacheDrManager drMgr,
CacheConflictResolutionManager<K, V> rslvrMgr,
- CachePluginManager pluginMgr
+ CachePluginManager pluginMgr,
+ GridCacheAffinityManager affMgr
) {
assert ctx != null;
assert sharedCtx != null;
@@ -323,12 +323,12 @@ public class GridCacheContext<K, V> implements Externalizable {
this.evictMgr = add(evictMgr);
this.qryMgr = add(qryMgr);
this.contQryMgr = add(contQryMgr);
- this.affMgr = add(affMgr);
this.dataStructuresMgr = add(dataStructuresMgr);
this.ttlMgr = add(ttlMgr);
this.drMgr = add(drMgr);
this.rslvrMgr = add(rslvrMgr);
this.pluginMgr = add(pluginMgr);
+ this.affMgr = add(affMgr);
log = ctx.log(getClass());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 3e0e2f9..1c34c76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1943,7 +1943,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
lock.readLock().unlock();
}
- if (res.error())
+ if (res.evictError())
// Complete future, since there was a class loading error on at least one node.
complete(false);
else
@@ -1985,14 +1985,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
boolean err = F.forAny(resMap.values(), new P1<GridCacheEvictionResponse>() {
@Override public boolean apply(GridCacheEvictionResponse res) {
- return res.error();
+ return res.evictError();
}
});
if (err) {
Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() {
@Override public boolean apply(UUID e) {
- return resMap.get(e).error();
+ return resMap.get(e).evictError();
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index 4d40c8d..aa3911b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -116,7 +116,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
/**
* @return {@code True} if request processing has finished with error.
*/
- boolean error() {
+ boolean evictError() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b55c84d..421ec82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -182,8 +182,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
if (c == null) {
- U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
- ", nodeId=" + nodeId + ']');
+ if (cctx.kernalContext().isStopping()) {
+ if (log.isDebugEnabled())
+ log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
+ else {
+ U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
+ ", nodeId=" + nodeId + ']');
+ }
return;
}
@@ -596,9 +603,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param msg Message to send.
* @param destNodeId Destination node ID.
+ * @return {@code True} if should send message.
* @throws IgniteCheckedException If failed.
*/
- private void onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+ if (msg.error() != null && cctx.kernalContext().isStopping())
+ return false;
+
if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());
@@ -609,6 +620,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled && msg instanceof GridCacheDeployable)
cctx.deploy().prepare((GridCacheDeployable)msg);
}
+
+ return true;
}
/**
@@ -624,7 +637,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
assert !node.isLocal();
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
@@ -663,12 +677,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param msg Message to send.
* @param plc IO policy.
* @param fallback Callback for failed nodes.
- * @return {@code True} if nodes are empty or message was sent, {@code false} if
- * all nodes have left topology while sending this message.
* @throws IgniteCheckedException If send failed.
*/
@SuppressWarnings({"BusyWait", "unchecked"})
- public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
+ public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
@Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
assert nodes != null;
assert msg != null;
@@ -677,10 +689,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Message will not be sent as collection of nodes is empty: " + msg);
- return true;
+ return;
}
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
@@ -709,7 +722,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -721,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg +
", nodes=" + U.toShortString(nodes) + ']');
- return false;
+ return;
}
}
@@ -737,7 +750,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (fallback != null && !fallback.apply(n))
// If fallback signalled to stop.
- return false;
+ return;
added = true;
}
@@ -757,7 +770,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" +
U.toShortString(nodes) + ']');
- return false;
+ return;
}
if (log.isDebugEnabled())
@@ -768,8 +781,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-
- return true;
}
/**
@@ -800,7 +811,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc,
long timeout) throws IgniteCheckedException {
- onSend(msg, node.id());
+ if (!onSend(msg, node.id()))
+ return;
int cnt = 0;
@@ -854,7 +866,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
assert node != null;
assert msg != null;
- onSend(msg, null);
+ if (!onSend(msg, null))
+ return;
try {
cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4e737a0..55688e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -77,6 +77,13 @@ public abstract class GridCacheMessage implements Message {
protected int cacheId;
/**
+ * @return Error, if any.
+ */
+ @Nullable public Throwable error() {
+ return null;
+ }
+
+ /**
* Gets next ID for indexed message ID.
*
* @return Message ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 555bbda..e2d0302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -120,6 +120,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
private IgniteLogger exchLog;
+ /** */
+ private volatile boolean stopping;
+
/** Lock callback. */
@GridToStringExclude
private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -325,8 +328,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Cancels all client futures.
*/
- public void cancelClientFutures() {
- cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+ public void onStop() {
+ stopping = true;
+
+ cancelClientFutures(stopError());
}
/** {@inheritDoc} */
@@ -362,6 +367,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Node stop exception.
+ */
+ private IgniteCheckedException stopError() {
+ return new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+ }
+
+ /**
* @param from From version.
* @return To version.
*/
@@ -385,8 +397,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ onFutureAdded(fut);
}
/**
@@ -507,17 +518,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
fut.onNodeLeft(n.id());
}
- if (cctx.kernalContext().clientDisconnected())
- ((GridFutureAdapter)fut).onDone(disconnectedError(null));
-
// Just in case if future was completed before it was added.
if (fut.isDone())
removeFuture(fut);
+ else
+ onFutureAdded(fut);
return true;
}
/**
+ * @param fut Future.
+ */
+ private void onFutureAdded(IgniteInternalFuture<?> fut) {
+ if (stopping)
+ ((GridFutureAdapter)fut).onDone(stopError());
+ else if (cctx.kernalContext().clientDisconnected())
+ ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+ }
+
+ /**
* @param fut Future to remove.
* @return {@code True} if removed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..34c571c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -147,6 +147,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+ /** */
+ private volatile IgniteCheckedException stopErr;
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
- IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+ stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.gridName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
@@ -391,11 +394,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchFuts0 != null) {
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
}
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
+
+ for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+ f.onDone(stopErr);
+
+ if (locExchFut != null)
+ locExchFut.onDone(stopErr);
U.cancel(exchWorker);
@@ -519,6 +528,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.onDone(topVer);
}
+ else if (stopErr != null)
+ fut.onDone(stopErr);
return fut;
}
@@ -791,6 +802,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
+ if (stopErr != null)
+ fut.onDone(stopErr);
+
return fut;
}
@@ -799,12 +813,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param err Error.
*/
public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
- if (err == null) {
- AffinityTopologyVersion topVer = exchFut.topologyVersion();
+ AffinityTopologyVersion topVer = exchFut.topologyVersion();
- if (log.isDebugEnabled())
- log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']');
+ if (log.isDebugEnabled())
+ log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ if (err == null) {
while (true) {
AffinityTopologyVersion readyVer = readyTopVer.get();
@@ -825,8 +839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
}
- else if (log.isDebugEnabled())
- log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']');
+ else {
+ for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+ if (entry.getKey().compareTo(topVer) <= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Completing created topology ready future with error " +
+ "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+
+ entry.getValue().onDone(err);
+ }
+ }
+ }
ExchangeFutureSet exchFuts0 = exchFuts;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4ae0baa..c92de7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -960,6 +960,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ cancelFutures();
+
List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
@@ -1323,12 +1325,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -1452,12 +1454,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
evictMgr,
qryMgr,
contQryMgr,
- affMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
- pluginMgr
+ pluginMgr,
+ affMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -2325,9 +2327,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3036,9 +3043,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
- if (ctx.clientDisconnected())
+ if (ctx.isStopping()) {
+ err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+ "node is stopping.");
+ }
+ else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
+ }
}
catch (IgniteCheckedException e) {
err = e;
@@ -3104,8 +3116,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Cancel all user operations.
*/
- public void cancelUserOperations() {
- sharedCtx.mvcc().cancelClientFutures();
+ private void cancelFutures() {
+ sharedCtx.mvcc().onStop();
Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdb878d..8a95b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -137,10 +137,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
return futId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 4264830..e798458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -67,10 +67,8 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
this.err = err;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index c11a3d7..6ade26f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.jetbrains.annotations.Nullable;
/**
* Future that implements a barrier after which dht topology is safe to use. Topology is considered to be
@@ -38,9 +39,10 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
public AffinityTopologyVersion topologyVersion();
/**
- * Returns is cache topology valid.
+ * Returns error is cache topology is not valid.
+ *
* @param cctx Cache context.
* @return valid ot not.
*/
- public boolean isCacheTopologyValid(GridCacheContext cctx);
+ @Nullable public Throwable validateCache(GridCacheContext cctx);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b9514a9..1a869e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1217,7 +1217,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
Throwable err = res.error();
// Log error before sending reply.
- if (err != null && !(err instanceof GridCacheLockTimeoutException))
+ if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping())
U.error(log, "Failed to acquire lock for request: " + req, err);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 33651bc..04d36e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -97,16 +97,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void onError(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Gets update error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
@@ -154,8 +153,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
nearEvicted.add(key);
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d93f68f..fb2c5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -385,9 +385,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -811,6 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
int size = keys.size();
@@ -837,13 +839,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq = mapSingleUpdate();
+ singleReq0 = singleReq = mapSingleUpdate();
}
else {
pendingMappings = mapUpdate(topNodes);
if (pendingMappings.size() == 1)
- singleReq = F.firstValue(pendingMappings);
+ singleReq0 = singleReq = F.firstValue(pendingMappings);
else {
if (syncMode == PRIMARY_SYNC) {
mappings = U.newHashMap(pendingMappings.size());
@@ -874,8 +876,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- if (singleReq != null)
- mapSingle(singleReq.nodeId(), singleReq);
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
else {
assert pendingMappings != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 5f5fbb5..ccb67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -198,6 +198,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
boolean skipStore,
boolean clientReq
) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8bc145c..376f4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -116,6 +116,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param futVer Future version.
*/
public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ assert futVer != null;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
@@ -149,16 +151,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void error(IgniteCheckedException err){
this.err = err;
}
- /**
- * @return Update error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 596ec77..1a08265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -524,7 +525,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtColocatedLockFuture.class, this, "inTx", inTx(), "super", super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ if (isMini(f)) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ else
+ return "[loc=true, done=" + f.isDone() + "]";
+ }
+ });
+
+ return S.toString(GridDhtColocatedLockFuture.class, this,
+ "innerFuts", futs,
+ "inTx", inTx(),
+ "super", super.toString());
}
/**
@@ -565,9 +581,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -612,9 +629,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -643,10 +661,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1327,8 +1350,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 36a2da1..eaed424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -283,7 +283,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onResult((ClusterTopologyCheckedException)e);
- else
+ else if (!cctx.kernalContext().isStopping())
fut.onResult(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index d31f096..93e39ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -98,10 +98,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
this.err = err;
}
- /**
- * @return Error, if any.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 865bbdc..a1b03c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1081,9 +1081,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/** {@inheritDoc} */
- @Override public boolean isCacheTopologyValid(GridCacheContext cctx) {
- return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ?
- cacheValidRes.get(cctx.cacheId()) : true;
+ @Override public Throwable validateCache(GridCacheContext cctx) {
+ Throwable err = error();
+
+ if (err != null)
+ return err;
+
+ if (cctx.config().getTopologyValidator() != null) {
+ Boolean res = cacheValidRes.get(cctx.cacheId());
+
+ if (res != null && !res) {
+ return new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache topology is not valid): " + cctx.name());
+ }
+ }
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 3276377..d4493a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -163,10 +163,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
return topVer != null ? topVer : super.topologyVersion();
}
- /**
- * @return Error.
- */
- public IgniteCheckedException error() {
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f3e5ca3..dcc8da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -703,9 +703,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
if (fut.topologyVersion().equals(topVer)){
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -749,9 +750,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
return;
}
@@ -777,10 +779,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
+ fut.get();
+
mapOnTopology(remap);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
@@ -1435,8 +1442,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
+ fut.get();
+
remap();
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.shared().txContextReset();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2048fdf..25028c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -319,7 +319,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
assert ctx != null : cacheId;
- if (!topFut.isCacheTopologyValid(ctx)) {
+ Throwable err = topFut.validateCache(ctx);
+
+ if (err != null) {
if (invalidCaches != null)
invalidCaches.append(", ");
else
@@ -343,12 +345,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
else {
topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
try {
+ fut.get();
+
prepareOnTopology(remap, c);
}
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
finally {
cctx.txContextReset();
}
@@ -841,7 +848,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (affFut != null && !affFut.isDone()) {
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- remap();
+ try {
+ fut.get();
+
+ remap();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index cec7d73..c860baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -75,10 +75,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
this.err = err;
}
- /**
- * @return Error.
- */
- @Nullable public Throwable error() {
+ /** {@inheritDoc} */
+ @Nullable @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 3e4cdeb..78e2ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -193,10 +193,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
return reqId;
}
- /**
- * @return Error.
- */
- public Throwable error() {
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
return err;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index df6b4b7..c99e07f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
+ /** */
+ private transient int cacheId;
+
/**
* Required by {@link Externalizable}.
*/
@@ -145,6 +149,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+
+ cacheId = CU.cacheId(cacheName);
}
/** {@inheritDoc} */
@@ -457,6 +463,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
+
+ cacheId = CU.cacheId(cacheName);
}
/**
@@ -466,9 +474,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
assert ctx != null;
- GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
-
- return cache == null ? null : cache.context();
+ return ctx.cache().<K, V>context().cacheContext(cacheId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 00b91dd..6ca1f72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1105,6 +1105,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Commits transaction to transaction manager. Used for one-phase commit transactions only.
+ *
+ * @param commit If {@code true} commits transaction, otherwise rollbacks.
*/
public void tmFinish(boolean commit) {
assert onePhaseCommit();
@@ -1118,7 +1120,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
state(commit ? COMMITTED : ROLLED_BACK);
- boolean needsCompletedVersions = needsCompletedVersions();
+ boolean needsCompletedVersions = commit && needsCompletedVersions();
assert !needsCompletedVersions || completedBase != null;
assert !needsCompletedVersions || committedVers != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
index 2aae6ef..6bfd4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
/**
@@ -76,6 +77,15 @@ public class GridSpinBusyLock {
}
/**
+ * @param millis Timeout.
+ * @return {@code True} if lock was acquired.
+ * @throws InterruptedException If interrupted.
+ */
+ public boolean tryBlock(long millis) throws InterruptedException {
+ return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Makes possible for activities entering busy state again.
*/
public void unblock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 6b4d473..151167a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -184,20 +185,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
String val = "value-" + k;
- cache.invoke(key, new Processor(val));
+ procs.put(key, new Processor(val));
}
- cache.invokeAll(procs);
+ Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs);
+
+ for (String key : procs.keySet()) {
+ EntryProcessorResult<Integer> res = resMap.get(key);
+
+ assertNotNull(res);
+ assertEquals(k + 1, (Object) res.get());
+ }
}
else {
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
for (int i = 0; i < NUM_SETS; i++) {
String key = "set-" + i;
String val = "value-" + k;
- IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+ Integer valsCnt = cache.invoke(key, new Processor(val));
- cache.invoke(key, new Processor(val));
+ assertEquals(k + 1, (Object)valsCnt);
}
}
}
@@ -275,7 +285,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** */
- private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable {
/** */
private String val;
@@ -287,7 +297,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ @Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) {
Set<String> vals = e.getValue();
if (vals == null)
@@ -297,7 +307,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
e.setValue(vals);
- return null;
+ return vals.size();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 7aae48c..88605b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -79,12 +79,12 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheEvictionManager(),
new GridCacheLocalQueryManager<K, V>(),
new CacheContinuousQueryManager(),
- new GridCacheAffinityManager(),
new CacheDataStructuresManager(),
new GridCacheTtlManager(),
new GridOsCacheDrManager(),
new CacheOsConflictResolutionManager<K, V>(),
- new CachePluginManager(ctx, new CacheConfiguration())
+ new CachePluginManager(ctx, new CacheConfiguration()),
+ new GridCacheAffinityManager()
);
store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());
http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 1276405..e00611b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -185,8 +185,6 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
* @throws Exception If failed.
*/
public void testRestarts() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1452");
-
int duration = 90 * 1000;
int qryThreadNum = 4;
int restartThreadsNum = 2; // 4 + 2 = 6 nodes
[12/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2cb397ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2cb397ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2cb397ad
Branch: refs/heads/ignite-1093-2
Commit: 2cb397add05d07ea6199e9de5ae59474f8f94c26
Parents: 9e0eafe
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 18:09:21 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 18:09:21 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionSupplier.java | 32 ++++++++++----------
1 file changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2cb397ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0641612..50d64f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -48,6 +48,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
@@ -97,28 +98,27 @@ class GridDhtPartitionSupplier {
void start() {
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
- for (int idx = 0; idx < lsnrCnt; idx++) {
- ClusterNode node;
- if (evt instanceof CacheRebalancingEvent)
- node = ((CacheRebalancingEvent)evt).discoveryNode();
- else if (evt instanceof DiscoveryEvent)
- node = ((DiscoveryEvent)evt).eventNode();
- else {
- assert false;
-
- return;
- }
+ if (evt instanceof CacheRebalancingEvent) {
+ ClusterNode node = ((CacheRebalancingEvent)evt).discoveryNode();
- T2<UUID, Integer> scId = new T2<>(node.id(), idx);
+ int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
- tryClearContext(scMap, scId, log);
+ for (int idx = 0; idx < lsnrCnt; idx++) {
+ T2<UUID, Integer> scId = new T2<>(node.id(), idx);
+
+ tryClearContext(scMap, scId, log);
+ }
+ }
+ else if (evt instanceof DiscoveryEvent) {
+ scMap.clear();
+ }
+ else {
+ assert false;
}
}
};
- cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+ cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
startOldListeners();
}
[14/16] ignite git commit: ignite-1500 Invalid docs in distributed
data structures classes
Posted by sb...@apache.org.
ignite-1500 Invalid docs in distributed data structures classes
Signed-off-by: Anton Vinogradov <av...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1021d4ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1021d4ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1021d4ea
Branch: refs/heads/ignite-1093-2
Commit: 1021d4eaf4b8318786f3ea7e44e5acef2fbb9004
Parents: c28cec1
Author: agura <ag...@gridgain.com>
Authored: Fri Sep 18 15:24:46 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 21 10:34:35 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteAtomicLong.java | 15 ++++++---------
.../org/apache/ignite/IgniteAtomicReference.java | 9 +++------
.../java/org/apache/ignite/IgniteAtomicSequence.java | 9 +++------
.../java/org/apache/ignite/IgniteAtomicStamped.java | 13 +++++--------
4 files changed, 17 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1021d4ea/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
index 83e2525..77a5830 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
@@ -26,29 +26,26 @@ import java.io.Closeable;
* Distributed atomic long includes the following main functionality:
* <ul>
* <li>
- * Method {@link #get()} synchronously gets current value of atomic long.
+ * Method {@link #get()} gets current value of atomic long.
* </li>
* <li>
- * Various {@code get..(..)} methods synchronously get current value of atomic long
+ * Various {@code get..(..)} methods get current value of atomic long
* and increase or decrease value of atomic long.
* </li>
* <li>
- * Method {@link #addAndGet(long l)} synchronously sums {@code l} with current value of atomic long
+ * Method {@link #addAndGet(long l)} sums {@code l} with current value of atomic long
* and returns result.
* </li>
* <li>
- * Method {@link #incrementAndGet()} synchronously increases value of atomic long and returns result.
+ * Method {@link #incrementAndGet()} increases value of atomic long and returns result.
* </li>
* <li>
- * Method {@link #decrementAndGet()} synchronously decreases value of atomic long and returns result.
+ * Method {@link #decrementAndGet()} decreases value of atomic long and returns result.
* </li>
* <li>
- * Method {@link #getAndSet(long l)} synchronously gets current value of atomic long and sets {@code l}
+ * Method {@link #getAndSet(long l)} gets current value of atomic long and sets {@code l}
* as value of atomic long.
* </li>
- * </ul>
- * All previously described methods have asynchronous analogs.
- * <ul>
* <li>
* Method {@link #name()} gets name of atomic long.
* </li>
http://git-wip-us.apache.org/repos/asf/ignite/blob/1021d4ea/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java
index a33cf35..a1e6e8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java
@@ -26,17 +26,14 @@ import java.io.Closeable;
* Distributed atomic reference includes the following main functionality:
* <ul>
* <li>
- * Method {@link #get()} synchronously gets current value of an atomic reference.
+ * Method {@link #get()} gets current value of an atomic reference.
* </li>
* <li>
- * Method {@link #set(Object)} synchronously and unconditionally sets the value in the an atomic reference.
+ * Method {@link #set(Object)} unconditionally sets the value in the an atomic reference.
* </li>
* <li>
- * Methods {@code compareAndSet(...)} synchronously and conditionally set the value in the an atomic reference.
+ * Methods {@code compareAndSet(...)} conditionally set the value in the an atomic reference.
* </li>
- * </ul>
- * All previously described methods have asynchronous analogs.
- * <ul>
* <li>
* Method {@link #name()} gets name of atomic reference.
* </li>
http://git-wip-us.apache.org/repos/asf/ignite/blob/1021d4ea/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
index 313aba5..a1e1392 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java
@@ -26,19 +26,16 @@ import java.io.Closeable;
* Distributed atomic sequence includes the following main functionality:
* <ul>
* <li>
- * Method {@link #get()} synchronously gets current value from atomic sequence.
+ * Method {@link #get()} gets current value from atomic sequence.
* </li>
* <li>
- * Various {@code get..(..)} methods synchronously get current value from atomic sequence
+ * Various {@code get..(..)} methods get current value from atomic sequence
* and increase atomic sequences value.
* </li>
* <li>
- * Various {@code add..(..)} {@code increment(..)} methods synchronously increase atomic sequences value
+ * Various {@code add..(..)} {@code increment(..)} methods increase atomic sequences value
* and return increased value.
* </li>
- * </ul>
- * All previously described methods have asynchronous analogs.
- * <ul>
* <li>
* Method {@link #batchSize(int size)} sets batch size of current atomic sequence.
* </li>
http://git-wip-us.apache.org/repos/asf/ignite/blob/1021d4ea/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java
index c05fdae..6c73248 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java
@@ -27,25 +27,22 @@ import org.apache.ignite.lang.IgniteBiTuple;
* Distributed atomic stamped includes the following main functionality:
* <ul>
* <li>
- * Method {@link #get()} synchronously gets both value and stamp of atomic.
+ * Method {@link #get()} gets both value and stamp of atomic.
* </li>
* <li>
- * Method {@link #value()} synchronously gets current value of atomic.
+ * Method {@link #value()} gets current value of atomic.
* </li>
* <li>
- * Method {@link #stamp()} synchronously gets current stamp of atomic.
+ * Method {@link #stamp()} gets current stamp of atomic.
* </li>
* <li>
- * Method {@link #set(Object, Object)} synchronously and unconditionally sets the value
+ * Method {@link #set(Object, Object)} unconditionally sets the value
* and the stamp in the atomic.
* </li>
* <li>
- * Methods {@code compareAndSet(...)} synchronously and conditionally set the value
+ * Methods {@code compareAndSet(...)} conditionally set the value
* and the stamp in the atomic.
* </li>
- * </ul>
- * All previously described methods have asynchronous analogs.
- * <ul>
* <li>
* Method {@link #name()} gets name of atomic stamped.
* </li>
[10/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c1aa262
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c1aa262
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c1aa262
Branch: refs/heads/ignite-1093-2
Commit: 8c1aa262948aea667b9b9f22630e084f684481e1
Parents: 65a9c28
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 16:04:52 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 16:04:52 2015 +0300
----------------------------------------------------------------------
.../rebalancing/GridCacheRebalancingSyncSelfTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1aa262/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 39f5d4b..712f3cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -110,8 +110,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
cacheRCfg.setCacheMode(CacheMode.REPLICATED);
cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
- cachePCfg.setRebalanceBatchSize(1);
- cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
+ cacheRCfg.setRebalanceBatchSize(1);
+ cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
[09/16] ignite git commit: Yardstick properties file fixed
Posted by sb...@apache.org.
Yardstick properties file fixed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c28cec1c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c28cec1c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c28cec1c
Branch: refs/heads/ignite-1093-2
Commit: c28cec1cd3b893adb4b5168af86d67bf5a998854
Parents: b6139f8
Author: agura <ag...@gridgain.com>
Authored: Fri Sep 18 14:50:19 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri Sep 18 14:50:19 2015 +0300
----------------------------------------------------------------------
modules/yardstick/config/benchmark-put-indexed-val.properties | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c28cec1c/modules/yardstick/config/benchmark-put-indexed-val.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-put-indexed-val.properties b/modules/yardstick/config/benchmark-put-indexed-val.properties
index 8f3a505..e81ae6f 100644
--- a/modules/yardstick/config/benchmark-put-indexed-val.properties
+++ b/modules/yardstick/config/benchmark-put-indexed-val.properties
@@ -21,7 +21,7 @@
# JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
-JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false" \
+JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false \
-XX:+UseParNewGC \
-XX:+UseConcMarkSweepGC \
-XX:+UseTLAB \
[02/16] ignite git commit: ignite-1.4 Fixed Visor cmd options.
Posted by sb...@apache.org.
ignite-1.4 Fixed Visor cmd options.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8c0b308
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8c0b308
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8c0b308
Branch: refs/heads/ignite-1093-2
Commit: b8c0b308a7f02f0495315e280936f6bacd170e44
Parents: 5cfb6e6
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Sep 17 17:17:05 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Sep 17 17:17:05 2015 +0700
----------------------------------------------------------------------
.../scala/org/apache/ignite/visor/commands/VisorConsole.scala | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8c0b308/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index 2abe8a7..6d91b05 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -105,6 +105,12 @@ class VisorConsole {
val batchCommand = argValue("e", argLst)
cfgFile.foreach(cfg => {
+ if (cfg.trim.isEmpty) {
+ visor.warn("Expected path to configuration after \"-cfg\" option.")
+
+ visor.quit()
+ }
+
if (batchFile.isDefined || batchCommand.isDefined) {
visor.warn("Options can't contains both -cfg and one of -b or -e options.")
[11/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e0eafed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e0eafed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e0eafed
Branch: refs/heads/ignite-1093-2
Commit: 9e0eafed477a3e09e4527f436be3de57d59c9848
Parents: 8c1aa26
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 16:52:38 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 16:52:38 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 126 +++++++++----------
.../dht/preloader/GridDhtPartitionSupplier.java | 8 +-
.../GridDhtPartitionSupplyMessageV2.java | 70 +++++------
3 files changed, 89 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 498b16d..596ec2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
@@ -189,11 +190,11 @@ public class GridDhtPartitionDemander {
}
/**
- * @param topVer Topology version.
+ * @param fut Future.
* @return {@code True} if topology changed.
*/
- private boolean topologyChanged(AffinityTopologyVersion topVer) {
- return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+ private boolean topologyChanged(SyncFuture fut) {
+ return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut;
}
/**
@@ -218,7 +219,7 @@ public class GridDhtPartitionDemander {
try {
SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
- if (!topologyChanged(fut.assigns.topologyVersion()))
+ if (!topologyChanged(fut))
wFut.get();
else {
fut.cancel();
@@ -257,8 +258,6 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
- syncFut = fut;
-
if (!oldFut.isDummy())
oldFut.cancel();
else
@@ -268,10 +267,15 @@ public class GridDhtPartitionDemander {
}
});
- if (fut.doneIfEmpty())// Done in case empty assigns.
+ syncFut = fut;
+
+ if (assigns.isEmpty()) {
+ fut.doneIfEmpty();
+
return;
+ }
- if (topologyChanged(fut.topologyVersion())) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -298,7 +302,7 @@ public class GridDhtPartitionDemander {
log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
", rebalanceOrder=" + rebalanceOrder + ']');
- if (!topologyChanged(fut.topologyVersion()))
+ if (!topologyChanged(fut))
oFut.get();
else {
fut.cancel();
@@ -323,7 +327,7 @@ public class GridDhtPartitionDemander {
}
}
- requestPartitions(fut);
+ requestPartitions(fut, assigns);
}
});
@@ -358,13 +362,9 @@ public class GridDhtPartitionDemander {
/**
* @param fut Future.
*/
- private void requestPartitions(SyncFuture fut) {
- final GridDhtPreloaderAssignments assigns = fut.assigns;
-
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
+ private void requestPartitions(SyncFuture fut, GridDhtPreloaderAssignments assigns) {
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -411,7 +411,7 @@ public class GridDhtPartitionDemander {
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
try {
- if (!topologyChanged(topVer))
+ if (!topologyChanged(fut))
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
else
fut.cancel();
@@ -498,16 +498,12 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = syncFut;
- if (!fut.topologyVersion().equals(topVer))//will check topology changed at loop.
- return;
-
ClusterNode node = cctx.node(id);
- if (node == null) {
- fut.cancel(id);
+ assert node != null;
+ if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut))
return;
- }
if (log.isDebugEnabled())
log.debug("Received supply message: " + supply);
@@ -527,7 +523,7 @@ public class GridDhtPartitionDemander {
try {
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -609,7 +605,10 @@ public class GridDhtPartitionDemander {
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
- GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+ d.timeout(cctx.config().getRebalanceTimeout());
if (d != null) {
// Create copy.
@@ -618,7 +617,7 @@ public class GridDhtPartitionDemander {
nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- if (!topologyChanged(topVer)) {
+ if (!topologyChanged(fut)) {
// Send demand message.
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
@@ -761,8 +760,12 @@ public class GridDhtPartitionDemander {
/** Lock. */
private final Lock lock = new ReentrantLock();
- /** Assignments. */
- private final GridDhtPreloaderAssignments assigns;
+ /** Exchange future. */
+ @GridToStringExclude
+ private final GridDhtPartitionsExchangeFuture exchFut;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
/**
* @param assigns Assigns.
@@ -774,7 +777,10 @@ public class GridDhtPartitionDemander {
GridCacheContext<?, ?> cctx,
IgniteLogger log,
boolean sentStopEvnt) {
- this.assigns = assigns;
+ assert assigns != null;
+
+ this.exchFut = assigns.exchangeFuture();
+ this.topVer = assigns.topologyVersion();
this.cctx = cctx;
this.log = log;
this.sendStoppedEvnt = sentStopEvnt;
@@ -792,7 +798,8 @@ public class GridDhtPartitionDemander {
* Dummy future. Will be done by real one.
*/
public SyncFuture() {
- this.assigns = null;
+ this.exchFut = null;
+ this.topVer = null;
this.cctx = null;
this.log = null;
this.sendStoppedEvnt = false;
@@ -802,14 +809,14 @@ public class GridDhtPartitionDemander {
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
- return assigns != null ? assigns.topologyVersion() : null;
+ return topVer;
}
/**
* @return Is dummy (created at demander creation).
*/
private boolean isDummy() {
- return assigns == null;
+ return topVer == null;
}
/**
@@ -828,41 +835,22 @@ public class GridDhtPartitionDemander {
}
/**
- * @param node Node.
- * @return Demand message.
- */
- private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
- if (isDone())
- return null;
-
- return assigns.get(node);
- }
-
- /**
- * @return future is done.
+ *
*/
- private boolean doneIfEmpty() {
+ private void doneIfEmpty() {
lock.lock();
try {
if (isDone())
- return true;
-
- if (assigns.isEmpty()) {
- assert remaining.isEmpty();
+ return;
- if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
- if (log.isDebugEnabled())
- log.debug("Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + assigns.topologyVersion() + "]");
+ assert remaining.isEmpty();
- checkIsDone();
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing is not required [cache=" + cctx.name() +
+ ", topology=" + topVer + "]");
- return true;
- }
- else {
- return false;
- }
+ checkIsDone();
}
finally {
lock.unlock();
@@ -953,7 +941,7 @@ public class GridDhtPartitionDemander {
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- assigns.exchangeFuture().discoveryEvent());
+ exchFut.discoveryEvent());
Collection<Integer> parts = remaining.get(nodeId).get2();
@@ -1011,15 +999,15 @@ public class GridDhtPartitionDemander {
}
if (!m.isEmpty()) {
- U.log(log,("Reassigning partitions that were missed: " + m));
+ U.log(log, ("Reassigning partitions that were missed: " + m));
- cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+ cctx.shared().exchange().forceDummyExchange(true, exchFut);
}
cctx.shared().exchange().scheduleResendPartitions();
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
onDone();
}
@@ -1163,7 +1151,7 @@ public class GridDhtPartitionDemander {
// Get the same collection that will be sent in the message.
Collection<Integer> remaining = d.partitions();
- if (topologyChanged(topVer))
+ if (topologyChanged(fut))
return missed;
cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@@ -1195,7 +1183,7 @@ public class GridDhtPartitionDemander {
// While.
// =====
- while (!topologyChanged(topVer)) {
+ while (!topologyChanged(fut)) {
SupplyMessage s = poll(msgQ, timeout);
// If timed out.
@@ -1358,7 +1346,7 @@ public class GridDhtPartitionDemander {
}
}
}
- while (retry && !topologyChanged(topVer));
+ while (retry && !topologyChanged(fut));
return missed;
}
@@ -1375,13 +1363,13 @@ public class GridDhtPartitionDemander {
demandLock.readLock().lock();
try {
- GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+ GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
- AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+ AffinityTopologyVersion topVer = fut.topVer;
Collection<Integer> missed = new HashSet<>();
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index ee01158..0641612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -185,7 +185,7 @@ class GridDhtPartitionSupplier {
if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
return;
- GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
d.updateSequence(), cctx.cacheId(), d.topologyVersion());
ClusterNode node = cctx.discovery().node(id);
@@ -289,7 +289,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
@@ -365,7 +365,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
@@ -477,7 +477,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 01056ac..17ebb26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -17,17 +17,30 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Partition supply message.
@@ -36,9 +49,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/** */
private static final long serialVersionUID = 0L;
- /** Worker ID. */
- private int workerId = -1;
-
/** Update sequence. */
private long updateSeq;
@@ -66,17 +76,14 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
private int msgSize;
/**
- * @param workerId Worker ID.
* @param updateSeq Update sequence for this node.
* @param cacheId Cache ID.
*/
- GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
- assert workerId >= 0;
+ GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
assert updateSeq > 0;
this.cacheId = cacheId;
this.updateSeq = updateSeq;
- this.workerId = workerId;
this.topVer = topVer;
}
@@ -98,13 +105,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
}
/**
- * @return Worker ID.
- */
- int workerId() {
- return workerId;
- }
-
- /**
* @return Update sequence.
*/
long updateSequence() {
@@ -255,7 +255,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
for (CacheEntryInfoCollection col : infos().values()) {
- List<GridCacheEntryInfo> entries = col.infos();
+ List<GridCacheEntryInfo> entries = col.infos();
for (int i = 0; i < entries.size(); i++)
entries.get(i).unmarshal(cacheCtx, ldr);
@@ -320,12 +320,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
writer.incrementState();
- case 9:
- if (!writer.writeInt("workerId", workerId))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -390,17 +384,9 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
reader.incrementState();
- case 9:
- workerId = reader.readInt("workerId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
- return true;
+ return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
}
/** {@inheritDoc} */
@@ -410,7 +396,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 9;
}
/** {@inheritDoc} */
[05/16] ignite git commit: minor
Posted by sb...@apache.org.
minor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d64fc9d1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d64fc9d1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d64fc9d1
Branch: refs/heads/ignite-1093-2
Commit: d64fc9d105c66c08234d7bdf72046128456620a5
Parents: 3676cbe
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Sep 17 18:03:57 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Sep 17 18:03:57 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d64fc9d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index d9f6840..0cbad48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -418,4 +418,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
-}
\ No newline at end of file
+}
[04/16] ignite git commit: Test for ignite-973.
Posted by sb...@apache.org.
Test for ignite-973.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3676cbe7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3676cbe7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3676cbe7
Branch: refs/heads/ignite-1093-2
Commit: 3676cbe7f5f5f73199487318d6841e50a1f73496
Parents: 585761f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 17 17:52:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 17 17:52:24 2015 +0300
----------------------------------------------------------------------
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++++++++++
2 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3676cbe7/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index edf7c52..be3f0e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -633,11 +633,23 @@ public final class GridTestUtils {
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task) {
+ return runAsync(task, "async-runner");
+ }
+
+ /**
+ * Runs callable task asyncronously.
+ *
+ * @param task Callable.
+ * @param threadName Thread name.
+ * @return Future with task result.
+ */
+ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+ public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task, String threadName) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to start new threads (test is being stopped).");
try {
- final GridTestSafeThreadFactory thrFactory = new GridTestSafeThreadFactory("async-runner");
+ final GridTestSafeThreadFactory thrFactory = new GridTestSafeThreadFactory(threadName);
final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
@Override public boolean cancel() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3676cbe7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
new file mode 100644
index 0000000..25c3b81
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheIndexStreamerTest extends GridCommonAbstractTest {
+ /** */
+ private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamer() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final int KEYS= 10_000;
+
+ try {
+ IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+ for (int i = 0; i < 1; i++)
+ streamer.addData(rnd.nextInt(KEYS), String.valueOf(i));
+ }
+ }
+
+ return null;
+ }
+ }, "streamer-thread");
+
+ IgniteInternalFuture updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ for (int i = 0; i < 100; i++) {
+ Integer key = rnd.nextInt(KEYS);
+
+ cache.put(key, String.valueOf(key));
+
+ cache.remove(key);
+ }
+ }
+
+ return null;
+ }
+ }, 1, "update-thread");
+
+ U.sleep(30_000);
+
+ stop.set(true);
+
+ streamerFut.get();
+ updateFut.get();
+ }
+ finally {
+ stop.set(true);
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ ccfg.setOffHeapMaxMemory(0);
+ ccfg.setBackups(1);
+ ccfg.setIndexedTypes(Integer.class, String.class);
+
+ return ccfg;
+ }
+
+
+}
[08/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65a9c289
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65a9c289
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65a9c289
Branch: refs/heads/ignite-1093-2
Commit: 65a9c289b4aeaa1b8d984328072f8458e0d0d504
Parents: 7d51f61
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 12:37:25 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 12:37:25 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/preloader/GridDhtPartitionDemander.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/65a9c289/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 3c5a2f2..498b16d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -852,7 +852,8 @@ public class GridDhtPartitionDemander {
assert remaining.isEmpty();
if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
- U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing is not required [cache=" + cctx.name() +
", topology=" + assigns.topologyVersion() + "]");
checkIsDone();
@@ -1010,8 +1011,7 @@ public class GridDhtPartitionDemander {
}
if (!m.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + m);
+ U.log(log,("Reassigning partitions that were missed: " + m));
cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
}
[16/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5da2ca9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5da2ca9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5da2ca9
Branch: refs/heads/ignite-1093-2
Commit: e5da2ca91e5ded08c6aa7d17787702a5900d19be
Parents: 5998af5
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 22 13:46:29 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 22 13:46:29 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 1 +
.../GridDhtPartitionDemandMessage.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 21 ++++++++------------
.../dht/preloader/GridDhtPartitionSupplier.java | 6 ++----
.../ignite/internal/util/lang/GridTuple4.java | 2 +-
5 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5da2ca9/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 92d9ab1..7f1fb86 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -401,6 +401,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
rebalanceDelay = cc.getRebalanceDelay();
rebalanceOrder = cc.getRebalanceOrder();
rebalancePoolSize = cc.getRebalanceThreadPoolSize();
+ rebalanceBatchesCount = cc.getRebalanceBatchesCount();
rebalanceTimeout = cc.getRebalanceTimeout();
rebalanceThrottle = cc.getRebalanceThrottle();
readFromBackup = cc.isReadFromBackup();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5da2ca9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index b588372..06ac54b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -327,7 +327,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
+ return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
super.toString());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5da2ca9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index a2f8c01..7f2dc48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -610,21 +610,16 @@ public class GridDhtPartitionDemander {
d.timeout(cctx.config().getRebalanceTimeout());
- if (d != null) {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+ d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
-
- if (!topologyChanged(fut)) {
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
- }
- else
- fut.cancel();
+ if (!topologyChanged(fut)) {
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
+ else
+ fut.cancel();
+
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5da2ca9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 1d8572a..d33dc5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -200,12 +200,10 @@ class GridDhtPartitionSupplier {
try {
SupplyContext sctx = scMap.get(scId);
- if (sctx == null && d.partitions().isEmpty())
+ if (sctx == null && d.partitions() == null)
return;
- if (sctx != null && !d.partitions().isEmpty()) {
- assert false;
- }
+ assert !(sctx != null && d.partitions() != null);
long bCnt = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5da2ca9/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
index 835cdcb..c95a859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
@@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza
GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o;
- return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
+ return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
}
/** {@inheritDoc} */
[07/16] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d51f61c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d51f61c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d51f61c
Branch: refs/heads/ignite-1093-2
Commit: 7d51f61c29a96475e6662484a6dae474b3f8f609
Parents: ff0e2e1
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 12:18:43 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 12:18:43 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 1 +
.../dht/preloader/GridDhtPartitionSupplier.java | 2 +
.../GridCacheRebalancingSyncSelfTest.java | 48 +++++++++++---------
4 files changed, 30 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index e77b540..92d9ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1084,7 +1084,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
- this.rebalanceBatchSize = rebalanceBatchSize;
+ this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
return this;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fbe57dc..3c5a2f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -409,6 +409,7 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+
try {
if (!topologyChanged(topVer))
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index fb9f796..ee01158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -509,6 +509,8 @@ class GridDhtPartitionSupplier {
}
}
+ scMap.remove(scId);
+
reply(node, d, s);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 3366381..39f5d4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -95,6 +95,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cachePCfg.setCacheMode(CacheMode.PARTITIONED);
cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cachePCfg.setBackups(1);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesCount(1);
CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
@@ -108,6 +110,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
cacheRCfg.setCacheMode(CacheMode.REPLICATED);
cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
@@ -123,19 +127,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
*/
- protected void generateData(Ignite ignite) {
- generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
- generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
- generateData(ignite, CACHE_NAME_DHT_REPLICATED);
- generateData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+ protected void generateData(Ignite ignite, int from) {
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
}
/**
* @param ignite Ignite.
*/
- protected void generateData(Ignite ignite, String name) {
+ protected void generateData(Ignite ignite, String name, int from) {
try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) {
- for (int i = 0; i < TEST_SIZE; i++) {
+ for (int i = from; i < from + TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
@@ -150,11 +154,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
* @param ignite Ignite.
* @throws IgniteCheckedException Exception.
*/
- protected void checkData(Ignite ignite) throws IgniteCheckedException {
- checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
- checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
- checkData(ignite, CACHE_NAME_DHT_REPLICATED);
- checkData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+ protected void checkData(Ignite ignite, int from) throws IgniteCheckedException {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
}
/**
@@ -162,13 +166,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
* @param name Cache name.
* @throws IgniteCheckedException Exception.
*/
- protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
- for (int i = 0; i < TEST_SIZE; i++) {
+ protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException {
+ for (int i = from; i < from + TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) :
- "value " + i + name.hashCode() + " does not match (" + ignite.cache(name).get(i) + ")";
+ "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")";
}
}
@@ -178,7 +182,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testSimpleRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");
@@ -204,7 +208,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long spend = (System.currentTimeMillis() - start) / 1000;
- checkData(grid(1));
+ checkData(grid(1), 0);
log.info("Spend " + spend + " seconds to rebalance entries.");
@@ -260,7 +264,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testComplexRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");
@@ -351,7 +355,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long spend = (System.currentTimeMillis() - start) / 1000;
- checkData(grid(4));
+ checkData(grid(4), 0);
log.info("Spend " + spend + " seconds to rebalance entries.");
@@ -370,7 +374,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
- generateData(ignite);
+ generateData(ignite, 0);
startGrid(1);
@@ -378,7 +382,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
stopGrid(0);
- checkData(grid(1));
+ checkData(grid(1), 0);
stopAllGrids();
}
@@ -389,7 +393,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testNodeFailedAtRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");
[06/16] ignite git commit: Merge branch 'ignite-1.4'
Posted by sb...@apache.org.
Merge branch 'ignite-1.4'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b6139f86
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b6139f86
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b6139f86
Branch: refs/heads/ignite-1093-2
Commit: b6139f86f8994dbbdd8acabedcd65b89e2b5aa92
Parents: c311c3c d64fc9d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 18 11:44:13 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 18 11:44:13 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 7 -
.../processors/cache/GridCacheContext.java | 6 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 ++++---
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 34 ++++-
.../GridCachePartitionExchangeManager.java | 41 ++++--
.../processors/cache/GridCacheProcessor.java | 28 ++--
.../GridDistributedLockResponse.java | 6 +-
.../GridDistributedTxPrepareResponse.java | 6 +-
.../distributed/dht/GridDhtTopologyFuture.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 +-
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++--
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 19 ++-
.../distributed/near/GridNearGetResponse.java | 6 +-
.../distributed/near/GridNearLockFuture.java | 26 +++-
.../near/GridNearOptimisticTxPrepareFuture.java | 20 ++-
.../near/GridNearTxFinishResponse.java | 6 +-
.../cache/query/GridCacheQueryResponse.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 ++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++++++++++
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
.../ignite/visor/commands/VisorConsole.scala | 8 +-
36 files changed, 450 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
[15/16] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-1093-2
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-1093-2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5998af56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5998af56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5998af56
Branch: refs/heads/ignite-1093-2
Commit: 5998af560af64c15052e33f6f2be71036d5854d5
Parents: 93caa0b 1021d4e
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 21 13:21:24 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 21 13:21:24 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 15 +-
.../apache/ignite/IgniteAtomicReference.java | 9 +-
.../org/apache/ignite/IgniteAtomicSequence.java | 9 +-
.../org/apache/ignite/IgniteAtomicStamped.java | 13 +-
.../apache/ignite/internal/IgniteKernal.java | 7 -
.../processors/cache/GridCacheContext.java | 6 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 ++++---
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 34 ++++-
.../GridCachePartitionExchangeManager.java | 41 ++++--
.../processors/cache/GridCacheProcessor.java | 28 ++--
.../GridDistributedLockResponse.java | 6 +-
.../GridDistributedTxPrepareResponse.java | 6 +-
.../distributed/dht/GridDhtTopologyFuture.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 ++-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 +-
.../colocated/GridDhtColocatedLockFuture.java | 44 ++++--
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 19 ++-
.../distributed/near/GridNearGetResponse.java | 6 +-
.../distributed/near/GridNearLockFuture.java | 26 +++-
.../near/GridNearOptimisticTxPrepareFuture.java | 20 ++-
.../near/GridNearTxFinishResponse.java | 6 +-
.../cache/query/GridCacheQueryResponse.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 ++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +++-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++++++++++
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
.../ignite/visor/commands/VisorConsole.scala | 8 +-
.../config/benchmark-put-indexed-val.properties | 2 +-
41 files changed, 468 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5998af56/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5998af56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5998af56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 91e3ea4,34c571c..e690704
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -435,12 -398,14 +438,18 @@@ public class GridCachePartitionExchange
}
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(err);
+ f.onDone(stopErr);
+
+ for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+ f.onDone(stopErr);
+
+ if (locExchFut != null)
+ locExchFut.onDone(stopErr);
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+ }
+
U.cancel(exchWorker);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/5998af56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------