You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/08 10:55:57 UTC

ignite git commit: ignite-1183 Fixed data structures create/destroy from client node

Repository: ignite
Updated Branches:
  refs/heads/ignite-1183 [created] f4c193e01


ignite-1183 Fixed data structures create/destroy from client node


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4c193e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4c193e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4c193e0

Branch: refs/heads/ignite-1183
Commit: f4c193e01f55a564f9df557aad65fe3027aba898
Parents: f025714
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 8 11:50:05 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 8 11:55:42 2015 +0300

----------------------------------------------------------------------
 .../colocated/GridDhtColocatedLockFuture.java   |  11 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  24 +--
 .../datastructures/DataStructuresProcessor.java |  45 ++++--
 ...gniteAtomicLongChangingTopologySelfTest.java | 155 +++++++++++++++++--
 5 files changed, 199 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c193e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 33a5cbd..be09f54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -598,7 +598,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys, false);
+            map(keys, false, true);
 
             markInitialized();
 
@@ -654,7 +654,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     this.topVer.compareAndSet(null, topVer);
                 }
 
-                map(keys, remap);
+                map(keys, remap, false);
 
                 if (c != null)
                     c.run();
@@ -691,8 +691,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      *
      * @param keys Keys.
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void map(Collection<KeyCacheObject> keys, boolean remap) {
+    private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -819,7 +820,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                     boolean clientFirst = false;
 
                                     if (first) {
-                                        clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+                                        clientFirst = clientNode &&
+                                            !topLocked &&
+                                            (tx == null || !tx.hasRemoteLocks());
 
                                         first = false;
                                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c193e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index dcc8da6..e6b1e02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -718,7 +718,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys, false);
+            map(keys, false, true);
 
             markInitialized();
 
@@ -773,7 +773,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                     this.topVer.compareAndSet(null, topVer);
                 }
 
-                map(keys, remap);
+                map(keys, remap, false);
 
                 markInitialized();
             }
@@ -807,8 +807,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      *
      * @param keys Keys.
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void map(Iterable<KeyCacheObject> keys, boolean remap) {
+    private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
@@ -938,7 +939,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                         boolean clientFirst = false;
 
                                         if (first) {
-                                            clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+                                            clientFirst = clientNode &&
+                                                !topLocked &&
+                                                (tx == null || !tx.hasRemoteLocks());
 
                                             first = false;
                                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c193e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 25028c4..1fb33a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
             cctx.mvcc().addFuture(this);
 
-            prepare0(false);
+            prepare0(false, true);
 
             return;
         }
@@ -338,7 +338,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 return;
             }
 
-            prepare0(remap);
+            prepare0(remap, false);
 
             if (c != null)
                 c.run();
@@ -428,8 +428,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * Initializes future.
      *
      * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
-    private void prepare0(boolean remap) {
+    private void prepare0(boolean remap, boolean topLocked) {
         try {
             boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
 
@@ -451,7 +452,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
-                tx.writeEntries());
+                tx.writeEntries(),
+                topLocked);
 
             markInitialized();
         }
@@ -466,11 +468,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      * @param reads Read entries.
      * @param writes Write entries.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      * @throws IgniteCheckedException If failed.
      */
     private void prepare(
         Iterable<IgniteTxEntry> reads,
-        Iterable<IgniteTxEntry> writes
+        Iterable<IgniteTxEntry> writes,
+        boolean topLocked
     ) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
@@ -497,7 +501,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         GridDistributedTxMapping cur = null;
 
         for (IgniteTxEntry read : reads) {
-            GridDistributedTxMapping updated = map(read, topVer, cur, false);
+            GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -514,7 +518,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         for (IgniteTxEntry write : writes) {
-            GridDistributedTxMapping updated = map(write, topVer, cur, true);
+            GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -647,13 +651,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * @param topVer Topology version.
      * @param cur Current mapping.
      * @param waitLock Wait lock flag.
+     * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      * @return Mapping.
      */
     private GridDistributedTxMapping map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         @Nullable GridDistributedTxMapping cur,
-        boolean waitLock
+        boolean waitLock,
+        boolean topLocked
     ) {
         GridCacheContext cacheCtx = entry.context();
 
@@ -685,7 +691,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
-            boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+            boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
 
             cur = new GridDistributedTxMapping(primary);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c193e0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..8ff35f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -505,14 +505,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 return dataStructure;
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                fut.get();
+            }
         }
     }
 
@@ -593,14 +598,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 if (afterRmv != null && rmvInfo != null)
                     afterRmv.applyx(rmvInfo);
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                fut.get();
+            }
         }
     }
 
@@ -995,14 +1005,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 return col;
             }
-            catch (ClusterTopologyCheckedException e) {
-                IgniteInternalFuture<?> fut = e.retryReadyFuture();
-
-                fut.get();
-            }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
             }
+            catch (IgniteCheckedException e) {
+                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+                if (topErr == null)
+                    throw e;
+
+                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
+
+                fut.get();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c193e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 337334e..32a86e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -17,21 +17,37 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
 /**
  *
@@ -52,6 +68,9 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,16 +79,18 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
 
         discoSpi.setIpFinder(IP_FINDER);
 
-        cfg.setDiscoverySpi(discoSpi);
+        cfg.setDiscoverySpi(discoSpi).setNetworkTimeout(30_000);
 
         AtomicConfiguration atomicCfg = new AtomicConfiguration();
-        atomicCfg.setCacheMode(CacheMode.PARTITIONED);
+        atomicCfg.setCacheMode(PARTITIONED);
         atomicCfg.setBackups(1);
 
         cfg.setAtomicConfiguration(atomicCfg);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -111,6 +132,110 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testClientAtomicLongCreateCloseFailover() throws Exception {
+        testFailoverWithClient(new IgniteInClosure<Ignite>() {
+            @Override public void apply(Ignite ignite) {
+                for (int i = 0; i < 100; i++) {
+                    IgniteAtomicLong l = ignite.atomicLong("long-" + 1, 0, true);
+
+                    l.close();
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientQueueCreateCloseFailover() throws Exception {
+        testFailoverWithClient(new IgniteInClosure<Ignite>() {
+            @Override public void apply(Ignite ignite) {
+                for (int i = 0; i < 100; i++) {
+                    CollectionConfiguration colCfg = new CollectionConfiguration();
+
+                    colCfg.setBackups(1);
+                    colCfg.setCacheMode(PARTITIONED);
+                    colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC);
+
+                    IgniteQueue q = ignite.queue("q-" + i, 0, colCfg);
+
+                    q.close();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Test iteration closure.
+     * @throws Exception If failed.
+     */
+    private void testFailoverWithClient(IgniteInClosure<Ignite> c) throws Exception {
+        startGridsMultiThreaded(GRID_CNT, false);
+
+        client = true;
+
+        Ignite ignite = startGrid(GRID_CNT);
+
+        assertTrue(ignite.configuration().isClientMode());
+
+        client = false;
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = restartThread(finished);
+
+        long stop = System.currentTimeMillis() + 30_000;
+
+        try {
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stop) {
+                log.info("Iteration: " + iter++);
+
+                c.apply(ignite);
+            }
+
+            finished.set(true);
+
+            fut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+
+    /**
+     * @param finished Finished flag.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> restartThread(final AtomicBoolean finished) {
+        return GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (!finished.get()) {
+                    for (int i = 0; i < GRID_CNT; i++) {
+                        log.info("Stop node: " + i);
+
+                        stopGrid(i);
+
+                        U.sleep(500);
+
+                        log.info("Start node: " + i);
+
+                        startGrid(i);
+
+                        if (finished.get())
+                            break;
+                    }
+                }
+
+                return null;
+            }
+        }, "restart-thread");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testIncrementConsistency() throws Exception {
         startGrids(GRID_CNT);