You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/22 17:42:00 UTC
[28/50] [abbrv] ignite git commit: Ignite-1913
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
index ffb50ca..9d6e82f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.util.typedef.CAX;
@@ -305,10 +305,10 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
GridDhtPartitionFullMap fullMap = top.partitionMap(true);
- for (Map.Entry<UUID, GridDhtPartitionMap> fe : fullMap.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> fe : fullMap.entrySet()) {
UUID nodeId = fe.getKey();
- GridDhtPartitionMap m = fe.getValue();
+ GridDhtPartitionMap2 m = fe.getValue();
for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
int p = e.getKey();
@@ -439,12 +439,12 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
assert orig.keySet().equals(cmp.keySet());
- for (Map.Entry<UUID, GridDhtPartitionMap> entry : orig.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> entry : orig.entrySet()) {
UUID nodeId = entry.getKey();
- GridDhtPartitionMap nodeMap = entry.getValue();
+ GridDhtPartitionMap2 nodeMap = entry.getValue();
- GridDhtPartitionMap cmpMap = cmp.get(nodeId);
+ GridDhtPartitionMap2 cmpMap = cmp.get(nodeId);
assert cmpMap != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
index a71475c..34e4333 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java
@@ -38,7 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
@@ -361,7 +361,7 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest {
GridDhtPartitionFullMap allParts = dht.topology().partitionMap(false);
- for (GridDhtPartitionMap parts : allParts.values()) {
+ for (GridDhtPartitionMap2 parts : allParts.values()) {
if (!parts.nodeId().equals(g.cluster().localNode().id())) {
for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
int p = e.getKey();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
index df55f7e..dd46e23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -114,7 +114,7 @@ public class GridCacheDhtTestUtils {
List<Integer> affParts = new LinkedList<>();
- GridDhtPartitionMap map = dht.topology().partitions(locNode.id());
+ GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id());
if (map != null)
for (int p : map.keySet())
@@ -146,7 +146,7 @@ public class GridCacheDhtTestUtils {
System.out.println("\nNode map:");
- for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(false).entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : top.partitionMap(false).entrySet()) {
List<Integer> list = new ArrayList<>(e.getValue().keySet());
Collections.sort(list);
@@ -184,7 +184,7 @@ public class GridCacheDhtTestUtils {
// They should be in topology in OWNING state.
Collection<Integer> affParts = new HashSet<>();
- GridDhtPartitionMap map = dht.topology().partitions(locNode.id());
+ GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id());
if (map != null)
for (int p : map.keySet())
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
new file mode 100644
index 0000000..a1ea7ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ *
+ */
+public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** partitioned cache name. */
+ protected static String CACHE = null;
+
+ /** */
+ private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>();
+
+ /** */
+ private volatile boolean record = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = new DelayableCommunicationSpi();
+
+ commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ commSpi.setTcpNoDelay(true);
+
+ iCfg.setCommunicationSpi(commSpi);
+
+ return iCfg;
+ }
+
+ /**
+ * Helps to delay GridDhtPartitionsFullMessages.
+ */
+ public class DelayableCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(final ClusterNode node, final Message msg,
+ final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ final Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
+ ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
+ rs.putIfAbsent(node.id(), new Runnable() {
+ @Override public void run() {
+ DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+ }
+ });
+ }
+ else
+ try {
+ super.sendMessage(node, msg, ackClosure);
+ }
+ catch (Exception e) {
+ U.log(null, e);
+ }
+
+ }
+ }
+
+ /**
+ * @throws Exception e.
+ */
+ public void test() throws Exception {
+ startGrid(0);
+
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName(CACHE);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setBackups(1);
+
+ ignite(0).getOrCreateCache(cfg);
+
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+
+ awaitPartitionMapExchange(true);
+
+ for (int i = 0; i < 2; i++) {
+ stopGrid(3);
+
+ awaitPartitionMapExchange(true);
+
+ startGrid(3);
+
+ awaitPartitionMapExchange(true);
+ }
+
+ startGrid(4);
+
+ awaitPartitionMapExchange(true);
+
+ assert rs.isEmpty();
+
+ record = true;
+
+ while (rs.size() < 3) { // N - 1 nodes.
+ U.sleep(10);
+ }
+
+ ignite(0).destroyCache(CACHE);
+
+ ignite(0).getOrCreateCache(cfg);
+
+ awaitPartitionMapExchange();
+
+ for (Runnable r : rs.values()) {
+ r.run();
+ }
+
+ U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+
+ stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case obsolete partition map accepted.
+
+ awaitPartitionMapExchange();
+
+ long topVer0 = grid(0).context().cache().context().exchange().readyAffinityVersion().topologyVersion();
+ long topVer1 = grid(1).context().cache().context().exchange().readyAffinityVersion().topologyVersion();
+ long topVer2 = grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion();
+
+ stopGrid(4); // Should force exchange in case exchange manager alive.
+
+ awaitPartitionMapExchange();
+
+ // Will fail in case exchange-workers are dead.
+ assert grid(0).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer0;
+ assert grid(1).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer1;
+ assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index ea13cdd..b02d022 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
@@ -141,6 +142,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
+ suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);
// Test for byte array value special case.
suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 9cf1e75..f515a78 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -58,7 +58,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -358,7 +358,7 @@ public class GridReduceQueryExecutor {
private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
- for (GridDhtPartitionMap map : fullMap.values()) {
+ for (GridDhtPartitionMap2 map : fullMap.values()) {
if (map.hasMovingPartitions())
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
index 83c50bd..ac91b51 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.yardstickframework.BenchmarkUtils;
@@ -53,7 +53,7 @@ public class WaitMapExchangeFinishCallable implements IgniteCallable<Void> {
boolean success = true;
if (top.topologyVersion().topologyVersion() == ignite.cluster().topologyVersion()) {
- for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(true).entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : top.partitionMap(true).entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> p : e.getValue().entrySet()) {
if (p.getValue() != GridDhtPartitionState.OWNING) {
BenchmarkUtils.println("Not owning partition [part=" + p.getKey() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
index 83fc58f..1a700c2 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.mxbean.IgniteMXBean;
@@ -196,7 +196,7 @@ public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheA
GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true);
- for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
log.info("Checking node: " + e.getKey());
for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) {