You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:35:28 UTC
[36/50] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol
change: notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
new file mode 100644
index 0000000..6d90d0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String TEST_CACHE = "testCache";
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** */
+ private boolean blockRebalance;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(1000);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null)
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private void blockRebalance() {
+ for (Ignite node : G.allGrids()) {
+ testSpi(node).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ Object msg0 = msg.message();
+
+ return (msg0 instanceof GridDhtPartitionSupplyMessage || msg0 instanceof GridDhtPartitionSupplyMessageV2)
+ && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE);
+ }
+ });
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllPrimaryFailure1() throws Exception {
+ putAllPrimaryFailure(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllPrimaryFailure1_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ putAllPrimaryFailure(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllPrimaryFailure2() throws Exception {
+ putAllPrimaryFailure(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllPrimaryFailure2_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ putAllPrimaryFailure(true, true);
+ }
+
+ /**
+ * @param fail0 Fail node 0 flag.
+ * @param fail1 Fail node 1 flag.
+ * @throws Exception If failed.
+ */
+ private void putAllPrimaryFailure(boolean fail0, boolean fail1) throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ if (!blockRebalance)
+ awaitPartitionMapExchange();
+
+ Ignite srv0 = ignite(0);
+ Ignite srv1 = ignite(1);
+
+ Integer key1 = primaryKey(srv0.cache(TEST_CACHE));
+ Integer key2 = primaryKey(srv1.cache(TEST_CACHE));
+
+ Map<Integer, Integer> map = new HashMap<>();
+ map.put(key1, key1);
+ map.put(key2, key2);
+
+ assertEquals(2, map.size());
+
+ if (fail0) {
+ testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
+ testSpi(client).blockMessages(GridNearAtomicCheckUpdateRequest.class, srv0.name());
+ }
+
+ if (fail1) {
+ testSpi(client).blockMessages(GridNearAtomicFullUpdateRequest.class, srv1.name());
+ testSpi(client).blockMessages(GridNearAtomicCheckUpdateRequest.class, srv1.name());
+ }
+
+ log.info("Start put [key1=" + key1 + ", key2=" + key2 + ']');
+
+ nearAsyncCache.putAll(map);
+
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ if (fail0)
+ stopGrid(0);
+
+ if (fail1)
+ stopGrid(1);
+
+ fut.get();
+
+ checkData(map);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllBackupFailure1() throws Exception {
+ putAllBackupFailure1();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllBackupFailure1_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ putAllBackupFailure1();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void putAllBackupFailure1() throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ if (!blockRebalance)
+ awaitPartitionMapExchange();
+
+ Ignite srv0 = ignite(0);
+
+ List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), 3);
+
+ Ignite backup = backup(client.affinity(TEST_CACHE), keys.get(0));
+
+ testSpi(backup).blockMessages(GridDhtAtomicNearResponse.class, client.name());
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (Integer key : keys)
+ map.put(key, key);
+
+ log.info("Start put [map=" + map + ']');
+
+ nearAsyncCache.putAll(map);
+
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ stopGrid(backup.name());
+
+ fut.get();
+
+ checkData(map);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackupFailure1() throws Exception {
+ putBackupFailure1();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackupFailure1_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ putBackupFailure1();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void putBackupFailure1() throws Exception {
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ if (!blockRebalance)
+ awaitPartitionMapExchange();
+
+ Ignite srv0 = ignite(0);
+
+ Integer key = primaryKey(srv0.cache(TEST_CACHE));
+
+ Ignite backup = backup(client.affinity(TEST_CACHE), key);
+
+ testSpi(backup).blockMessages(GridDhtAtomicNearResponse.class, client.name());
+
+ log.info("Start put [key=" + key + ']');
+
+ nearAsyncCache.put(key, key);
+
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ stopGrid(backup.name());
+
+ fut.get();
+
+ checkData(F.asMap(key, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFullAsyncPutRemap() throws Exception {
+ fullAsyncRemap(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFullAsyncPutAllRemap() throws Exception {
+ fullAsyncRemap(true);
+ }
+
+ /**
+ * @param putAll Test putAll flag.
+ * @throws Exception If failed.
+ */
+ private void fullAsyncRemap(boolean putAll) throws Exception {
+ Ignite srv0 = startGrid(0);
+
+ client = true;
+
+ Ignite clientNode = startGrid(1);
+
+ client = false;
+
+ final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC));
+
+ List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1);
+
+ testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
+ testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ for (Integer key : keys)
+ map.put(key, -key);
+
+ if (putAll)
+ nearCache.putAll(map);
+ else
+ nearCache.put(keys.get(0), map.get(keys.get(0)));
+
+ int nodeIdx = 2;
+
+ Affinity<Object> aff = clientNode.affinity(TEST_CACHE);
+
+ int keysMoved;
+
+ do {
+ startGrid(nodeIdx);
+
+ awaitPartitionMapExchange();
+
+ keysMoved = 0;
+
+ for (Integer key : keys) {
+ if (!aff.isPrimary(srv0.cluster().localNode(), key))
+ keysMoved++;
+ }
+
+ if (keysMoved == keys.size())
+ break;
+
+ nodeIdx++;
+ }
+ while (nodeIdx < 10);
+
+ assertEquals(keys.size(), keysMoved);
+
+ testSpi(clientNode).stopBlock(true);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (Integer key : map.keySet()) {
+ if (nearCache.get(key) == null)
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkData(map);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutPrimarySync() throws Exception {
+ startGrids(2);
+
+ client = true;
+
+ Ignite clientNode = startGrid(2);
+
+ client = false;
+
+ final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, PRIMARY_SYNC));
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ awaitPartitionMapExchange();
+
+ Ignite srv0 = grid(0);
+ final Ignite srv1 = grid(1);
+
+ final Integer key = primaryKey(srv0.cache(TEST_CACHE));
+
+ testSpi(srv0).blockMessages(GridDhtAtomicSingleUpdateRequest.class, srv1.name());
+
+ nearAsyncCache.put(key, key);
+
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ fut.get(5, TimeUnit.SECONDS);
+
+ assertEquals(key, srv0.cache(TEST_CACHE).get(key));
+
+ assertNull(srv1.cache(TEST_CACHE).localPeek(key));
+
+ testSpi(srv0).stopBlock(true);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv1.cache(TEST_CACHE).localPeek(key) != null;
+ }
+ }, 5000);
+
+ checkData(F.asMap(key, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutNearNodeFailure() throws Exception {
+ startGrids(2);
+
+ client = true;
+
+ Ignite clientNode = startGrid(2);
+
+ final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_SYNC));
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ awaitPartitionMapExchange();
+
+ final Ignite srv0 = grid(0);
+ final Ignite srv1 = grid(1);
+
+ final Integer key = primaryKey(srv0.cache(TEST_CACHE));
+
+ nearAsyncCache.put(key, key);
+
+ testSpi(srv1).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name());
+
+ stopGrid(2);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount() == 0;
+ }
+ }, 5000);
+
+ assertEquals(0, ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount());
+ assertEquals(0, ((IgniteKernal)srv1).context().cache().context().mvcc().atomicFuturesCount());
+
+ checkData(F.asMap(key, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllNearNodeFailure() throws Exception {
+ final int SRVS = 4;
+
+ startGrids(SRVS);
+
+ client = true;
+
+ Ignite clientNode = startGrid(SRVS);
+
+ final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_SYNC));
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < SRVS; i++)
+ testSpi(grid(i)).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name());
+
+ final Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i);
+
+ nearAsyncCache.putAll(map);
+
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ IgniteCache cache = ignite(0).cache(TEST_CACHE);
+
+ for (Integer key : map.keySet()) {
+ if (cache.get(key) == null)
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ assertTrue(wait);
+
+ stopGrid(SRVS);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int i = 0; i < SRVS; i++) {
+ if (grid(i).context().cache().context().mvcc().atomicFuturesCount() != 0)
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ for (int i = 0; i < SRVS; i++)
+ assertEquals(0, grid(i).context().cache().context().mvcc().atomicFuturesCount());
+
+ checkData(map);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations0() throws Exception {
+ cacheOperations(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations_UnstableTopology0() throws Exception {
+ blockRebalance = true;
+
+ cacheOperations(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations1() throws Exception {
+ cacheOperations(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations_UnstableTopology1() throws Exception {
+ blockRebalance = true;
+
+ cacheOperations(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations2() throws Exception {
+ cacheOperations(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperations_UnstableTopology2() throws Exception {
+ blockRebalance = true;
+
+ cacheOperations(2);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @throws Exception If failed.
+ */
+ private void cacheOperations(int backups) throws Exception {
+ ccfg = cacheConfiguration(backups, FULL_SYNC);
+
+ final int SRVS = 4;
+
+ startServers(SRVS);
+
+ client = true;
+
+ Ignite clientNode = startGrid(SRVS);
+
+ final IgniteCache<Integer, Integer> nearCache = clientNode.cache(TEST_CACHE);
+
+ Integer key = primaryKey(ignite(0).cache(TEST_CACHE));
+
+ nearCache.replace(key, 1);
+
+ nearCache.remove(key);
+
+ nearCache.invoke(key, new SetValueEntryProcessor(null));
+
+ Map<Integer, SetValueEntryProcessor> map = new HashMap<>();
+
+ List<Integer> keys = primaryKeys(ignite(0).cache(TEST_CACHE), 2);
+
+ map.put(keys.get(0), new SetValueEntryProcessor(1));
+ map.put(keys.get(1), new SetValueEntryProcessor(null));
+
+ nearCache.invokeAll(map);
+
+ Set<Integer> rmvAllKeys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++) {
+ nearCache.put(i, i);
+
+ if (i % 2 == 0)
+ rmvAllKeys.add(i);
+ }
+
+ nearCache.removeAll(rmvAllKeys);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutMissedDhtRequest_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+
+ Integer key = primaryKey(ignite(0).cache(TEST_CACHE));
+
+ log.info("Start put [key=" + key + ']');
+
+ nearAsyncCache.put(key, key);
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ stopGrid(0);
+
+ fut.get();
+
+ checkData(F.asMap(key, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllMissedDhtRequest_UnstableTopology1() throws Exception {
+ putAllMissedDhtRequest_UnstableTopology(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllMissedDhtRequest_UnstableTopology2() throws Exception {
+ putAllMissedDhtRequest_UnstableTopology(true, true);
+ }
+
+ /**
+ * @param fail0 Fail node 0 flag.
+ * @param fail1 Fail node 1 flag.
+ * @throws Exception If failed.
+ */
+ private void putAllMissedDhtRequest_UnstableTopology(boolean fail0, boolean fail1) throws Exception {
+ blockRebalance = true;
+
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ if (fail0) {
+ testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+ }
+ if (fail1) {
+ testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+ }
+
+ Integer key1 = primaryKey(ignite(0).cache(TEST_CACHE));
+ Integer key2 = primaryKey(ignite(2).cache(TEST_CACHE));
+
+ log.info("Start put [key1=" + key1 + ", key2=" + key1 + ']');
+
+ Map<Integer, Integer> map = new HashMap<>();
+ map.put(key1, 10);
+ map.put(key2, 20);
+
+ nearAsyncCache.putAll(map);
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ if (fail0)
+ stopGrid(0);
+ if (fail1)
+ stopGrid(2);
+
+ fut.get();
+
+ checkData(map);
+ }
+
+ /**
+ * @param expData Expected cache data.
+ */
+ private void checkData(Map<Integer, Integer> expData) {
+ assert !expData.isEmpty();
+
+ List<Ignite> nodes = G.allGrids();
+
+ assertFalse(nodes.isEmpty());
+
+ for (Ignite node : nodes) {
+ IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE);
+
+ for (Map.Entry<Integer, Integer> e : expData.entrySet()) {
+ assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']',
+ e.getValue(),
+ cache.get(e.getKey()));
+ }
+ }
+ }
+
+ /**
+ * @param aff Affinity.
+ * @param key Key.
+ * @return Backup node for given key.
+ */
+ private Ignite backup(Affinity<Object> aff, Object key) {
+ for (Ignite ignite : G.allGrids()) {
+ ClusterNode node = ignite.cluster().localNode();
+
+ if (aff.isPrimaryOrBackup(node, key) && !aff.isPrimary(node, key))
+ return ignite;
+ }
+
+ fail("Failed to find backup for key: " + key);
+
+ return null;
+ }
+
+ /**
+ * @param node Node.
+ * @return Node communication SPI.
+ */
+ private TestRecordingCommunicationSpi testSpi(Ignite node) {
+ return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param writeSync Cache write synchronization mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups,
+ CacheWriteSynchronizationMode writeSync) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName(TEST_CACHE);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setWriteSynchronizationMode(writeSync);
+ ccfg.setBackups(backups);
+ ccfg.setRebalanceMode(ASYNC);
+
+ return ccfg;
+ }
+
+ /**
+ * @param cnt Number of server nodes.
+ * @throws Exception If failed.
+ */
+ private void startServers(int cnt) throws Exception {
+ startGrids(cnt - 1);
+
+ awaitPartitionMapExchange();
+
+ if (blockRebalance)
+ blockRebalance();
+
+ startGrid(cnt - 1);
+ }
+
+ /**
+ *
+ */
+ public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> {
+ /** */
+ private Integer val;
+
+ /**
+ * @param val Value.
+ */
+ SetValueEntryProcessor(Integer val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) {
+ if (val != null)
+ entry.setValue(val);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
deleted file mode 100644
index 9057507..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.configuration.IgniteConfiguration;
-
-/**
- *
- */
-public class GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest extends
- GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setStripedPoolSize(-1);
-
- return cfg;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
index 7646741..9505b24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
@@ -40,11 +40,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -147,7 +149,7 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest {
startGrids();
ClusterNode n1 = F.first(aff.nodes(aff.partition(1), grid(0).cluster().nodes()));
- ClusterNode n2 = F.first(aff.nodes(aff.partition(2), grid(0).cluster().nodes()));
+ final ClusterNode n2 = F.first(aff.nodes(aff.partition(2), grid(0).cluster().nodes()));
assertNotNull(n1);
assertNotNull(n2);
@@ -164,7 +166,7 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest {
assertNull(cache1.getAndPut(1, "v1"));
assertNull(cache1.getAndPut(2, "v2"));
- GridDhtCacheEntry e1 = (GridDhtCacheEntry)dht(cache1).entryEx(1);
+ final GridDhtCacheEntry e1 = (GridDhtCacheEntry)dht(cache1).entryEx(1);
GridDhtCacheEntry e2 = (GridDhtCacheEntry)dht(cache2).entryEx(2);
assertNotNull(e1.readers());
@@ -207,6 +209,17 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest {
assertNotNull(cache1.getAndPut(1, "z1"));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return !e1.readers().contains(n2.id());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 5000);
+
// Node 1 still has node2 in readers map.
assertFalse(e1.readers().contains(n2.id()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
deleted file mode 100644
index 7ba3144..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.configuration.IgniteConfiguration;
-
-/**
- *
- */
-public class GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest extends
- GridCachePartitionedMultiNodeFullApiSelfTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setStripedPoolSize(-1);
-
- return cfg;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 958bb5c..39e995a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -771,7 +771,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return TimeUnit.SECONDS.toMillis(15);
+ return TimeUnit.SECONDS.toMillis(2 * 60);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index cb8e755..716bb0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -258,15 +258,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
qryClnCache.put(key, -1);
qryClnCache.put(keys.get(0), 100);
- }
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return lsnr.evts.size() == 1;
- }
- }, 5000);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr.evts.size() == 1;
+ }
+ }, 5000);
- assertEquals(lsnr.evts.size(), 1);
+ assertEquals(lsnr.evts.size(), 1);
+ }
}
/**
@@ -480,7 +480,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
if (rnd.nextBoolean())
cache = qryClient.cache(null);
else {
- for (int j = 0; j < 10; j++) {
+ for (int j = 0; j < 1000; j++) {
int nodeIdx = rnd.nextInt(SRV_NODES);
if (killedNode != nodeIdx) {
@@ -1150,11 +1150,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
boolean lostAllow, boolean wait) throws Exception {
if (wait) {
GridTestUtils.waitForCondition(new PA() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return expEvts.size() == lsnr.size();
}
- }, 2000L);
+ }, 10_000L);
}
synchronized (lsnr) {
@@ -1970,9 +1969,9 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
- return lsnr.size() <= size;
+ return lsnr.size() >= size;
}
- }, 2000L);
+ }, 10_000L);
List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
@@ -2300,16 +2299,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
log.info("Batch loaded. Iteration: " + iteration);
- final long cnt = lsnr.count();
-
final long expCnt = putCnt * stableNodeCnt + ignoredDupEvts;
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return cnt == expCnt;
+ return lsnr.count() == expCnt;
}
}, 6_000);
+ final long cnt = lsnr.count();
+
if (cnt != expCnt) {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index ecfb4e8..efac24a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -132,7 +132,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
1,
EventType.UPDATED,
- new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}),
+ new KeyCacheObjectImpl(1, new byte[] {0, 0, 0, 1}, 1),
new CacheObjectImpl(2, new byte[] {0, 0, 0, 2}),
new CacheObjectImpl(2, new byte[] {0, 0, 0, 3}),
true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java
index 5591f2c..b7b6966 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioEmbeddedFutureSelfTest.java
@@ -32,7 +32,7 @@ public class GridNioEmbeddedFutureSelfTest extends GridCommonAbstractTest {
*/
public void testNioEmbeddedFuture() throws Exception {
// Original future.
- final GridNioFutureImpl<Integer> origFut = new GridNioFutureImpl<>();
+ final GridNioFutureImpl<Integer> origFut = new GridNioFutureImpl<>(null);
// Embedded future to test.
final GridNioEmbeddedFuture<Integer> embFut = new GridNioEmbeddedFuture<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
index 684ae01..44a1eff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/nio/GridNioFutureSelfTest.java
@@ -35,24 +35,23 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
* Test for NIO future.
*/
public class GridNioFutureSelfTest extends GridCommonAbstractTest {
-
/**
* @throws Exception If failed.
*/
public void testOnDone() throws Exception {
- GridNioFutureImpl<String> fut = new GridNioFutureImpl<>();
+ GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null);
fut.onDone();
assertNull(fut.get());
- fut = new GridNioFutureImpl<>();
+ fut = new GridNioFutureImpl<>(null);
fut.onDone("test");
assertEquals("test", fut.get());
- fut = new GridNioFutureImpl<>();
+ fut = new GridNioFutureImpl<>(null);
fut.onDone(new IgniteCheckedException("TestMessage"));
@@ -64,7 +63,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
}
}, IgniteCheckedException.class, "TestMessage");
- fut = new GridNioFutureImpl<>();
+ fut = new GridNioFutureImpl<>(null);
fut.onDone("test", new IgniteCheckedException("TestMessage"));
@@ -76,7 +75,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
}
}, IgniteCheckedException.class, "TestMessage");
- fut = new GridNioFutureImpl<>();
+ fut = new GridNioFutureImpl<>(null);
fut.onDone("test");
@@ -86,12 +85,12 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testOnCancelled() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridNioFutureImpl<String> fut = new GridNioFutureImpl<>();
+ GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null);
fut.onCancelled();
@@ -101,7 +100,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridNioFutureImpl<String> fut = new GridNioFutureImpl<>();
+ GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null);
fut.onCancelled();
@@ -116,7 +115,7 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testListenSyncNotify() throws Exception {
- GridNioFutureImpl<String> fut = new GridNioFutureImpl<>();
+ GridNioFutureImpl<String> fut = new GridNioFutureImpl<>(null);
int lsnrCnt = 10;
@@ -167,9 +166,9 @@ public class GridNioFutureSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testGet() throws Exception {
- GridNioFutureImpl<Object> unfinished = new GridNioFutureImpl<>();
- GridNioFutureImpl<Object> finished = new GridNioFutureImpl<>();
- GridNioFutureImpl<Object> cancelled = new GridNioFutureImpl<>();
+ GridNioFutureImpl<Object> unfinished = new GridNioFutureImpl<>(null);
+ GridNioFutureImpl<Object> finished = new GridNioFutureImpl<>(null);
+ GridNioFutureImpl<Object> cancelled = new GridNioFutureImpl<>(null);
finished.onDone("Finished");
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index d403784..e6aab9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioFilterChain;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -114,7 +116,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
proceedExceptionCaught(ses, ex);
}
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) {
sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME));
sndMsgObj.compareAndSet(null, msg);
@@ -155,7 +157,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
chain.onSessionIdleTimeout(ses);
chain.onSessionWriteTimeout(ses);
assertNull(chain.onSessionClose(ses));
- assertNull(chain.onSessionWrite(ses, snd, true));
+ assertNull(chain.onSessionWrite(ses, snd, true, null));
assertEquals("DCBA", connectedEvt.get());
assertEquals("DCBA", disconnectedEvt.get());
@@ -210,10 +212,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
chainMeta(ses, MESSAGE_WRITE_META_NAME);
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
@@ -349,7 +351,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void sendNoFuture(Object msg) {
+ @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index ab21165..a59b6d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -118,7 +118,7 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
* @return Swap key.
*/
private SwapKey key(int i) {
- return new SwapKey(new KeyCacheObjectImpl(i, U.intToBytes(i)), i % 11, U.intToBytes(i));
+ return new SwapKey(new KeyCacheObjectImpl(i, U.intToBytes(i), i), i % 11, U.intToBytes(i));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
index 17757ab..96a8a33 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOffHeapTieredFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeoutFullApiTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyFairAffinityMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeP2PDisabledFullApiSelfTest;
@@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAto
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderFairAffinityMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeP2PDisabledFullApiSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapTieredFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWrityOrderOffHeapMultiNodeFullApiSelfTest;
@@ -76,9 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeCounterSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeP2PDisabledFullApiSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNearOnlyNoPrimaryFullApiSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapTieredFullApiSelfTest;
@@ -230,10 +228,6 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheReplicatedFullApiMultithreadedSelfTest.class);
suite.addTestSuite(GridCachePartitionedFullApiMultithreadedSelfTest.class);
- // Disabled striped pool.
- suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.class);
- suite.addTestSuite(GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.class);
-
// Other.
suite.addTestSuite(GridCacheClearSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 03204e2..6fc6846 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinity
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest;
import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest;
@@ -66,6 +67,8 @@ public class IgniteCacheTestSuite5 extends TestSuite {
suite.addTestSuite(CacheRebalancingSelfTest.class);
+ suite.addTestSuite(IgniteCacheAtomicProtocolTest.class);
+
suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);
return suite;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index 9204c97..8a20eec 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -1306,11 +1308,14 @@ public class HadoopExternalCommunication {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage))
log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']');
- return proceedSessionWrite(ses, msg, fut);
+ return proceedSessionWrite(ses, msg, fut, ackC);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
index 3f33fb7..08981af 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.nio.GridNioFilter;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
+import org.apache.ignite.lang.IgniteInClosure;
/**
* Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
@@ -190,7 +192,10 @@ public class HadoopIpcToNioAdapter<T> {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) {
assert ses == HadoopIpcToNioAdapter.this.ses : "ses=" + ses +
", this.ses=" + HadoopIpcToNioAdapter.this.ses;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
index 24bba88..d90a900 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
/**
@@ -57,12 +59,16 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+ Object msg,
+ boolean fut,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
- return proceedSessionWrite(ses, U.marshal(marsh, msg), fut);
+ return proceedSessionWrite(ses, U.marshal(marsh, msg), fut, ackC);
}
+ /** {@inheritDoc} */
@Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
assert msg instanceof byte[];
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
index 40e563c..8d15e5e 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.yardstick.cache.model.SampleValue;
/**
- * Ignite benchmark that performs invoke operations.
+ * Ignite benchmark that performs getAndPut operations.
*/
public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
index 49ae985..0a3794c 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.ignite.yardstick.cache.model.SampleValue;
import org.yardstickframework.BenchmarkConfiguration;
/**
- * Ignite benchmark that performs invoke operations.
+ * Ignite benchmark that performs getAndPut operations.
*/
public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** */