You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/11/09 08:54:41 UTC
[39/49] ignite git commit: Ignite-1093 "Rebalancing with default
parameters is very slow" fixes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 998c720..c634ff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -17,15 +17,18 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -47,27 +50,42 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
/**
* DHT cache preloader.
*/
public class GridDhtPreloader extends GridCachePreloaderAdapter {
+ /**
+ * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved.
+ * Node automatically chose communication protocol depends on remote node's version.
+ * Backward compatibility may be removed at Ignite 2.x.
+ */
+ public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
+
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
@@ -81,10 +99,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool supplyPool;
+ private GridDhtPartitionSupplier supplier;
/** Partition demanders. */
- private GridDhtPartitionDemandPool demandPool;
+ private GridDhtPartitionDemander demander;
/** Start future. */
private GridFutureAdapter<Object> startFut;
@@ -92,10 +110,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
+ /** Demand lock. */
+ private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
/** Pending affinity assignment futures. */
private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
new ConcurrentHashMap8<>();
+ /** */
+ private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>();
+
+ /** */
+ private final AtomicInteger partsEvictOwning = new AtomicInteger();
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -179,8 +206,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+ supplier = new GridDhtPartitionSupplier(cctx);
+ demander = new GridDhtPartitionDemander(cctx, busyLock);
+
+ supplier.start();
+ demander.start();
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -197,19 +227,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
final long startTopVer = loc.order();
topVer.setIfGreater(startTopVer);
-
- supplyPool.start();
- demandPool.start();
}
/** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
super.preloadPredicate(preloadPred);
- assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+ assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
- supplyPool.preloadPredicate(preloadPred);
- demandPool.preloadPredicate(preloadPred);
+ supplier.preloadPredicate(preloadPred);
+ demander.preloadPredicate(preloadPred);
}
/** {@inheritDoc} */
@@ -223,37 +250,109 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
// Acquire write busy lock.
busyLock.writeLock().lock();
- if (supplyPool != null)
- supplyPool.stop();
+ if (supplier != null)
+ supplier.stop();
- if (demandPool != null)
- demandPool.stop();
+ if (demander != null)
+ demander.stop();
top = null;
}
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
- if (err == null) {
+ if (err == null)
startFut.onDone();
+ else
+ startFut.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ demander.updateLastExchangeFuture(lastFut);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+ supplier.onTopologyChanged(topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ // No assignments for disabled preloader.
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ if (!cctx.rebalanceEnabled())
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ int partCnt = cctx.affinity().partitions();
- final long start = U.currentTimeMillis();
+ assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+ exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ ", topVer=" + top.topologyVersion() + ']';
- final CacheConfiguration cfg = cctx.config();
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
- demandPool.syncFuture().listen(new CI1<Object>() {
- @Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
- "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ for (int p = 0; p < partCnt; p++) {
+ if (cctx.shared().exchange().hasPendingExchange()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+ exchFut.exchangeId());
+
+ break;
+ }
+
+ // If partition belongs to local node.
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+ assert part.id() == p;
+
+ if (part.state() != MOVING) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+ continue; // For.
+ }
+
+ Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+ if (picked.isEmpty()) {
+ top.own(part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+ cctx.events().addPreloadEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+ discoEvt.type(), discoEvt.timestamp());
}
- });
+
+ if (log.isDebugEnabled())
+ log.debug("Owning partition as there are no other owners: " + part);
+ }
+ else {
+ ClusterNode n = F.rand(picked);
+
+ GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchFut.exchangeId().topologyVersion(),
+ cctx.cacheId()));
+ }
+
+ msg.addPartition(p);
+ }
}
}
- else
- startFut.onDone(err);
+
+ return assigns;
}
/** {@inheritDoc} */
@@ -267,24 +366,77 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.set(topVer0);
}
- /** {@inheritDoc} */
- @Override public void onExchangeFutureAdded() {
- demandPool.onExchangeFutureAdded();
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Picked owners.
+ */
+ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+ int affCnt = affNodes.size();
+
+ Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+ int rmtCnt = rmts.size();
+
+ if (rmtCnt <= affCnt)
+ return rmts;
+
+ List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+ // Sort in descending order, so nodes with higher order will be first.
+ Collections.sort(sorted, CU.nodeComparator(false));
+
+ // Pick newest nodes.
+ return sorted.subList(0, affCnt);
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
}
/** {@inheritDoc} */
- @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- demandPool.updateLastExchangeFuture(lastFut);
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+ if (!enterBusy())
+ return;
+
+ try {
+ demandLock.readLock().lock();
+ try {
+ demander.handleSupplyMessage(idx, id, s);
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
+ }
+ finally {
+ leaveBusy();
+ }
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- return demandPool.assign(exchFut);
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ if (!enterBusy())
+ return;
+
+ try {
+ supplier.handleDemandMessage(idx, id, d);
+ }
+ finally {
+ leaveBusy();
+ }
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
- demandPool.addAssignments(assignments, forcePreload);
+ @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+ boolean forcePreload, Collection<String> caches, int cnt) {
+ return demander.addAssignments(assignments, forcePreload, caches, cnt);
}
/**
@@ -296,7 +448,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+ return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
}
/**
@@ -580,12 +737,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void forcePreload() {
- demandPool.forcePreload();
+ demander.forcePreload();
}
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- demandPool.unwindUndeploys();
+ demandLock.writeLock().lock();
+
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
}
/**
@@ -607,6 +771,44 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+ partsToEvict.add(part);
+
+ if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
+ cctx.closures().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ boolean locked = true;
+
+ while (locked || !partsToEvict.isEmptyx()) {
+ if (!locked && !partsEvictOwning.compareAndSet(0, 1))
+ return false;
+
+ try {
+ GridDhtLocalPartition part = partsToEvict.poll();
+
+ if (part != null)
+ part.tryEvict();
+ }
+ finally {
+ if (!partsToEvict.isEmptyx())
+ locked = true;
+ else {
+ boolean res = partsEvictOwning.compareAndSet(1, 0);
+
+ assert res;
+
+ locked = false;
+ }
+ }
+ }
+
+ return true;
+ }
+ }, /*system pool*/ true);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void dumpDebugInfo() {
if (!forceKeyFuts.isEmpty()) {
U.warn(log, "Pending force key futures [cache=" + cctx.name() +"]:");
@@ -621,6 +823,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
U.warn(log, ">>> " + fut);
}
+
+ supplier.dumpDebugInfo();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 7c5e97c..810bd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1292,6 +1292,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
catch (IgniteCheckedException e) {
U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
}
+ finally {
+ ctx.cache().context().txContextReset();
+ }
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 26a41de..9315d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,7 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
- return;
+ selfOccupied = true;
+
+ continue;
}
// Only process 1st response and ignore following ones. This scenario
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
index 835cdcb..c95a859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
@@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza
GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o;
- return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
+ return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6254605..854ce95 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* <p>
* This method is intended for test purposes only.
*/
- void simulateNodeFailure() {
+ protected void simulateNodeFailure() {
impl.simulateNodeFailure();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index 1b2b84d..f4423f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -87,7 +87,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
}
/** Test key 1. */
- public static class TestKey implements Externalizable {
+ protected static class TestKey implements Externalizable {
/** Field. */
@QuerySqlField(index = true)
private String field;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index cadd03f..fe0b84e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
@@ -46,12 +51,6 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionRollbackException;
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -192,13 +191,13 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
boolean backup,
final boolean commit
) throws Exception {
- startGrids(gridCount());
- awaitPartitionMapExchange();
+ try {
+ startGrids(gridCount());
+ awaitPartitionMapExchange();
- for (int i = 0; i < gridCount(); i++)
- info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
+ for (int i = 0; i < gridCount(); i++)
+ info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
- try {
final Ignite ignite = ignite(0);
final IgniteCache<Object, Object> cache = ignite.cache(null).withNoRetries();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..7759c70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+ }
+
+ return iCfg;
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ public void testNodeFailedAtRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ startGrid(1);
+
+ GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)grid(1).context().
+ cache().internalCache(CACHE_NAME_DHT_REPLICATED).preloader().rebalanceFuture();
+
+ fut.get();
+
+ U.sleep(10);
+
+ ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure();
+
+ waitForRebalancing(0, 3);
+
+ checkSupplyContextMapIsEmpty();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..8c5cd40
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Map;
+import java.util.Random;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int TEST_SIZE = 100_000;
+
+ /** partitioned cache name. */
+ protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+
+ /** partitioned cache 2 name. */
+ protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+
+ /** replicated cache name. */
+ protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+
+ /** replicated cache 2 name. */
+ protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+
+ /** */
+ private volatile boolean concurrentStartFinished;
+
+ /** */
+ private volatile boolean concurrentStartFinished2;
+
+ /** */
+ private volatile boolean concurrentStartFinished3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(10).equals(gridName))
+ iCfg.setClientMode(true);
+
+ CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>();
+
+ cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED);
+ cachePCfg.setCacheMode(CacheMode.PARTITIONED);
+ cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg.setBackups(1);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesPrefetchCount(1);
+ cachePCfg.setRebalanceOrder(2);
+
+ CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
+
+ cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2);
+ cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
+ cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg2.setBackups(1);
+ cachePCfg2.setRebalanceOrder(2);
+ //cachePCfg2.setRebalanceDelay(5000);//Known issue, possible deadlock in case of low priority cache rebalancing delayed.
+
+ CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+ cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
+ cacheRCfg.setCacheMode(CacheMode.REPLICATED);
+ cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheRCfg.setRebalanceBatchSize(1);
+ cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE);
+ ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE.
+
+ CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
+
+ cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
+ cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
+ cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheRCfg2.setRebalanceOrder(4);
+
+ iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2);
+
+ iCfg.setRebalanceThreadPoolSize(2);
+
+ return iCfg;
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite, int from, int iter) {
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite, String name, int from, int iter) {
+ for (int i = from; i < from + TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ ignite.cache(name).put(i, i + name.hashCode() + iter);
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param name Cache name.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException {
+ for (int i = from; i < from + TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
+ i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+ }
+ }
+
+ /**
+ * @throws Exception Exception
+ */
+ public void testSimpleRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+
+ waitForRebalancing(0, 2);
+ waitForRebalancing(1, 2);
+
+ stopGrid(0);
+
+ waitForRebalancing(1, 3);
+
+ startGrid(2);
+
+ waitForRebalancing(1, 4);
+ waitForRebalancing(2, 4);
+
+ stopGrid(2);
+
+ waitForRebalancing(1, 5);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(1), 0, 0);
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+ }
+
+ /**
+ * @throws Exception Exception
+ */
+ public void testLoadRebalancing() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ startGrid(1);
+
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ concurrentStartFinished = false;
+
+ Thread t1 = new Thread() {
+ @Override public void run() {
+ Random rdm = new Random();
+
+ while (!concurrentStartFinished) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ int ii = rdm.nextInt(TEST_SIZE);
+
+ ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode());
+ }
+ }
+ }
+ };
+
+ Thread t2 = new Thread() {
+ @Override public void run() {
+ while (!concurrentStartFinished) {
+ try {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ t1.start();
+ t2.start();
+
+ startGrid(2);
+ startGrid(3);
+
+ stopGrid(2);
+
+ startGrid(4);
+
+ waitForRebalancing(3, 6);
+ waitForRebalancing(4, 6);
+
+ concurrentStartFinished = true;
+
+ awaitPartitionMapExchange(true);
+
+ checkSupplyContextMapIsEmpty();
+
+ t1.join();
+ t2.join();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ info("Time to rebalance entries: " + spend);
+ }
+
+ /**
+ * @param id Node id.
+ * @param major Major ver.
+ * @param minor Minor ver.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+ }
+
+ /**
+ * @param id Node id.
+ * @param major Major ver.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major));
+ }
+
+ /**
+ * @param id Node id.
+ * @param top Topology version.
+ * @throws IgniteCheckedException
+ */
+ protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
+ boolean finished = false;
+
+ while (!finished) {
+ finished = true;
+
+ for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+ GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
+ if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
+ finished = false;
+
+ break;
+ }
+ else if (!fut.get()) {
+ finished = false;
+
+ log.warning("Rebalancing finished with missed partitions.");
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ protected void checkSupplyContextMapIsEmpty() {
+ for (Ignite g : G.allGrids()) {
+ for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
+
+ Object supplier = U.field(c.preloader(), "supplier");
+
+ Map map = U.field(supplier, "scMap");
+
+ synchronized (map) {
+ assert map.isEmpty();
+ }
+ }
+ }
+ }
+
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testComplexRebalancing() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ concurrentStartFinished = false;
+ concurrentStartFinished2 = false;
+ concurrentStartFinished3 = false;
+
+ Thread t1 = new Thread() {
+ @Override public void run() {
+ try {
+ startGrid(1);
+ startGrid(2);
+
+ while (!concurrentStartFinished2) {
+ U.sleep(10);
+ }
+
+ waitForRebalancing(0, 5, 0);
+ waitForRebalancing(1, 5, 0);
+ waitForRebalancing(2, 5, 0);
+ waitForRebalancing(3, 5, 0);
+ waitForRebalancing(4, 5, 0);
+
+ //New cache should start rebalancing.
+ CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+ cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+ cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ grid(0).getOrCreateCache(cacheRCfg);
+
+ while (!concurrentStartFinished3) {
+ U.sleep(10);
+ }
+
+ concurrentStartFinished = true;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t2 = new Thread() {
+ @Override public void run() {
+ try {
+ startGrid(3);
+ startGrid(4);
+
+ concurrentStartFinished2 = true;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t3 = new Thread() {
+ @Override public void run() {
+ generateData(ignite, 0, 1);
+
+ concurrentStartFinished3 = true;
+ }
+ };
+
+ t1.start();
+ t2.start();// Should cancel t1 rebalancing.
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+
+ waitForRebalancing(0, 5, 1);
+ waitForRebalancing(1, 5, 1);
+ waitForRebalancing(2, 5, 1);
+ waitForRebalancing(3, 5, 1);
+ waitForRebalancing(4, 5, 1);
+
+ awaitPartitionMapExchange(true);
+
+ checkSupplyContextMapIsEmpty();
+
+ checkData(grid(4), 0, 1);
+
+ final Ignite ignite3 = grid(3);
+
+ Thread t4 = new Thread() {
+ @Override public void run() {
+ generateData(ignite3, 0, 2);
+
+ }
+ };
+
+ t4.start();
+
+ stopGrid(1);
+
+ waitForRebalancing(0, 6);
+ waitForRebalancing(2, 6);
+ waitForRebalancing(3, 6);
+ waitForRebalancing(4, 6);
+
+ awaitPartitionMapExchange(true);
+
+ checkSupplyContextMapIsEmpty();
+
+ stopGrid(0);
+
+ waitForRebalancing(2, 7);
+ waitForRebalancing(3, 7);
+ waitForRebalancing(4, 7);
+
+ awaitPartitionMapExchange(true);
+
+ checkSupplyContextMapIsEmpty();
+
+ stopGrid(2);
+
+ waitForRebalancing(3, 8);
+ waitForRebalancing(4, 8);
+
+ awaitPartitionMapExchange(true);
+
+ checkSupplyContextMapIsEmpty();
+
+ t4.join();
+
+ stopGrid(3);
+
+ waitForRebalancing(4, 9);
+
+ checkSupplyContextMapIsEmpty();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(4), 0, 2);
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
new file mode 100644
index 0000000..831e82d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** partitioned cache name. */
+ protected static String CACHE = "cache";
+
+ /** Allows to change behavior of readExternal method. */
+ protected static AtomicInteger readCnt = new AtomicInteger();
+
+ /** Test key 1. */
+ private static class TestKey implements Externalizable {
+ /** Field. */
+ @QuerySqlField(index = true)
+ private String field;
+
+ /**
+ * @param field Test key 1.
+ */
+ public TestKey(String field) {
+ this.field = field;
+ }
+
+ /** Test key 1. */
+ public TestKey() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey key = (TestKey)o;
+
+ return !(field != null ? !field.equals(key.field) : key.field != null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return field != null ? field.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(field);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ field = (String)in.readObject();
+
+ if (readCnt.decrementAndGet() <= 0)
+ throw new IOException("Class can not be unmarshalled.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<TestKey, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName(CACHE);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setBackups(0);
+
+ iCfg.setCacheConfiguration(cfg);
+
+ return iCfg;
+ }
+
+ /**
+ * @throws Exception e.
+ */
+ public void test() throws Exception {
+ readCnt.set(Integer.MAX_VALUE);
+
+ startGrid(0);
+
+ for (int i = 0; i < 100; i++) {
+ grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
+ }
+
+ readCnt.set(1);
+
+ startGrid(1);
+
+ readCnt.set(Integer.MAX_VALUE);
+
+ for (int i = 0; i < 50; i++) {
+ assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
+ }
+
+ stopGrid(0);
+
+ for (int i = 50; i < 100; i++) {
+ assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) == null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index c4ad169..64f1495 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -142,26 +142,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception If failed.
- */
- public void testSingleZeroPoolSize() throws Exception {
- preloadMode = SYNC;
- poolSize = 0;
-
- try {
- startGrid(1);
-
- assert false : "Grid should have been failed to start.";
- }
- catch (IgniteCheckedException e) {
- info("Caught expected exception: " + e);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
* @throws Exception If test failed.
*/
public void testIntegrity() throws Exception {
@@ -602,4 +582,4 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
// No-op.
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
- *
- */
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
- /** */
- private boolean ignorePingResponse;
-
- /** {@inheritDoc} */
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
- IgniteCheckedException {
- if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
- return;
- else
- super.writeToSocket(sock, msg, timeout);
- }
- }
-
- /**
* @throws Exception If any error occurs.
*/
public void testNodeAdded() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ public boolean ignorePingResponse;
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+ return;
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void simulateNodeFailure() {
+ super.simulateNodeFailure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..41d4b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1228,7 +1229,7 @@ public abstract class GridAbstractTest extends TestCase {
cfg.setCommunicationSpi(commSpi);
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
if (isDebug()) {
discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 28d5c73..71f3ee3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -63,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
@@ -414,6 +416,15 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
*/
@SuppressWarnings("BusyWait")
protected void awaitPartitionMapExchange() throws InterruptedException {
+ awaitPartitionMapExchange(false);
+ }
+
+ /**
+ * @param waitEvicts If {@code true} will wait for evictions finished.
+ * @throws InterruptedException If interrupted.
+ */
+ @SuppressWarnings("BusyWait")
+ protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException {
for (Ignite g : G.allGrids()) {
IgniteKernal g0 = (IgniteKernal)g;
@@ -451,7 +462,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
int actual = owners.size();
- if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)) {
+ GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
+
+ if (affNodes.size() != owners.size() || !affNodes.containsAll(owners) ||
+ (waitEvicts && loc != null && loc.state() == GridDhtPartitionState.RENTING)) {
LT.warn(log(), null, "Waiting for topology map update [" +
"grid=" + g.name() +
", cache=" + cfg.getName() +
@@ -484,7 +498,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
if (i == 0)
start = System.currentTimeMillis();
- if (System.currentTimeMillis() - start > 30_000)
+ if (System.currentTimeMillis() - start > 30_000) {
+ U.dumpThreads(log);
+
throw new IgniteException("Timeout of waiting for topology map update [" +
"grid=" + g.name() +
", cache=" + cfg.getName() +
@@ -493,6 +509,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
", p=" + p +
", readVer=" + readyVer +
", locNode=" + g.cluster().localNode() + ']');
+ }
Thread.sleep(200); // Busy wait.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 796c531..c3c3659 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -49,6 +49,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -135,6 +137,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(IgniteTxReentryColocatedSelfTest.class);
suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
// Test for byte array value special case.
suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
index 0226046..582bfe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
@@ -107,25 +107,33 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testTask() throws Exception {
+ Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+ int initSize = map.size();
+
ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), Task.class, null);
- testMessageSet(fut);
+ testMessageSet(fut, initSize, map);
}
/**
* @throws Exception If failed.
*/
public void testTaskException() throws Exception {
+ Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+ int initSize = map.size();
+
ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), FailTask.class, null);
- testMessageSet(fut);
+ testMessageSet(fut, initSize, map);
}
/**
* @param fut Future to cancel.
* @throws Exception If failed.
*/
- private void testMessageSet(IgniteFuture<?> fut) throws Exception {
+ private void testMessageSet(IgniteFuture<?> fut, int initSize, Map map) throws Exception {
cancelLatch.await();
assertTrue(fut.cancel());
@@ -134,11 +142,9 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
assertTrue(U.await(finishLatch, 5000, MILLISECONDS));
- Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
-
info("Map: " + map);
- assertTrue(map.isEmpty());
+ assertEquals(map.size(), initSize);
}
/**