You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/17 08:54:42 UTC

[33/41] ignite git commit: IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock. Reviewed and merged by Denis Magda.

IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock.
Reviewed and merged by Denis Magda.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/314794bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/314794bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/314794bf

Branch: refs/heads/ignite-3331
Commit: 314794bfc71a04a3c185abdbafe1f81b17cc0ec4
Parents: fb23e00
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 18:43:22 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 18:43:22 2016 +0300

----------------------------------------------------------------------
 .../GridCacheCountDownLatchImpl.java            |  54 ++++++-
 .../IgniteCountDownLatchAbstractSelfTest.java   | 156 ++++++++++++++++++-
 2 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/314794bf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index c984ab3..5adeb38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -26,7 +26,7 @@ import java.io.ObjectStreamException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
@@ -51,6 +51,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Internal latch is in unitialized state. */
+    private static final int UNINITIALIZED_LATCH_STATE = 0;
+
+    /** Internal latch is being created. */
+    private static final int CREATING_LATCH_STATE = 1;
+
+    /** Internal latch is ready for the usage. */
+    private static final int READY_LATCH_STATE = 2;
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
         new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
@@ -84,14 +93,17 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     private boolean autoDel;
 
     /** Internal latch (transient). */
-    private volatile CountDownLatch internalLatch;
+    private CountDownLatch internalLatch;
 
     /** Initialization guard. */
-    private final AtomicBoolean initGuard = new AtomicBoolean();
+    private final AtomicInteger initGuard = new AtomicInteger();
 
     /** Initialization latch. */
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
+    /** Latest latch value that is used at the stage while the internal latch is being initialized. */
+    private Integer lastLatchVal = null;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -237,15 +249,36 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     @Override public void onUpdate(int cnt) {
         assert cnt >= 0;
 
-        while (internalLatch != null && internalLatch.getCount() > cnt)
-            internalLatch.countDown();
+        CountDownLatch latch0;
+
+        synchronized (initGuard) {
+            int state = initGuard.get();
+
+            if (state != READY_LATCH_STATE) {
+                /** Internal latch is not fully initialized yet. Remember latest latch value. */
+                lastLatchVal = cnt;
+
+                return;
+            }
+
+            /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+            latch0 = internalLatch;
+        }
+
+        /** Internal latch is fully initialized and ready for the usage. */
+
+        assert latch0 != null;
+
+        while (latch0.getCount() > cnt)
+            latch0.countDown();
+
     }
 
     /**
      * @throws IgniteCheckedException If operation failed.
      */
     private void initializeLatch() throws IgniteCheckedException {
-        if (initGuard.compareAndSet(false, true)) {
+        if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
             try {
                 internalLatch = CU.outTx(
                     retryTopologySafe(new Callable<CountDownLatch>() {
@@ -269,6 +302,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                     ctx
                 );
 
+                synchronized (initGuard) {
+                    if (lastLatchVal != null) {
+                        while (internalLatch.getCount() > lastLatchVal)
+                            internalLatch.countDown();
+                    }
+
+                    initGuard.set(READY_LATCH_STATE);
+                }
+
                 if (log.isDebugEnabled())
                     log.debug("Initialized internal latch: " + internalLatch);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/314794bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 2f6f6f4..f6d0287 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -22,21 +22,26 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -144,7 +149,6 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
 
     /**
      * @param latch Latch.
-     *
      * @throws Exception If failed.
      */
     protected void checkRemovedLatch(final IgniteCountDownLatch latch) throws Exception {
@@ -236,8 +240,8 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
      * @param latchName Latch name.
      * @param cnt Count.
      * @param autoDel Auto delete flag.
-     * @throws Exception If failed.
      * @return New latch.
+     * @throws Exception If failed.
      */
     private IgniteCountDownLatch createLatch(String latchName, int cnt, boolean autoDel)
         throws Exception {
@@ -334,6 +338,58 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
     /**
      * @throws Exception If failed.
      */
+    public void testLatchBroadcast() throws Exception {
+        Ignite ignite = grid(0);
+        ClusterGroup srvsGrp = ignite.cluster().forServers();
+
+        int numOfSrvs = srvsGrp.nodes().size();
+
+        ignite.destroyCache("testCache");
+        IgniteCache<Object, Object> cache = ignite.createCache("testCache");
+
+        for (ClusterNode node : srvsGrp.nodes())
+            cache.put(String.valueOf(node.id()), 0);
+
+        for (int i = 0; i < 500; i++) {
+            IgniteCountDownLatch latch1 = createLatch1(ignite, numOfSrvs);
+            IgniteCountDownLatch latch2 = createLatch2(ignite, numOfSrvs);
+
+            ignite.compute(srvsGrp).broadcast(new IgniteRunnableJob(latch1, latch2, i));
+            assertTrue(latch2.await(10000));
+        }
+    }
+
+    /**
+     * @param client Ignite client.
+     * @param numOfSrvs Number of server nodes.
+     * @return Ignite latch.
+     */
+    private IgniteCountDownLatch createLatch1(Ignite client, int numOfSrvs) {
+        return client.countDownLatch(
+            "testName1", // Latch name.
+            numOfSrvs,          // Initial count.
+            true,        // Auto remove, when counter has reached zero.
+            true         // Create if it does not exist.
+        );
+    }
+
+    /**
+     * @param client Ignite client.
+     * @param numOfSrvs Number of server nodes.
+     * @return Ignite latch.
+     */
+    private IgniteCountDownLatch createLatch2(Ignite client, int numOfSrvs) {
+        return client.countDownLatch(
+            "testName2", // Latch name.
+            numOfSrvs,          // Initial count.
+            true,        // Auto remove, when counter has reached zero.
+            true         // Create if it does not exist.
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testLatchMultinode2() throws Exception {
         if (gridCount() == 1)
             return;
@@ -391,4 +447,100 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         // No-op.
     }
+
+    /**
+     * Ignite job
+     */
+    public class IgniteRunnableJob implements IgniteRunnable {
+
+        /**
+         * Ignite.
+         */
+        @IgniteInstanceResource
+        Ignite igniteInstance;
+
+        /**
+         * Number of iteration.
+         */
+        protected final int iteration;
+
+        /**
+         * Ignite latch 1.
+         */
+        private final IgniteCountDownLatch latch1;
+
+        /**
+         * Ignite latch 2.
+         */
+        private final IgniteCountDownLatch latch2;
+
+        /**
+         * @param latch1 Ignite latch 1.
+         * @param latch2 Ignite latch 2.
+         * @param iteration Number of iteration.
+         */
+        public IgniteRunnableJob(IgniteCountDownLatch latch1, IgniteCountDownLatch latch2, int iteration) {
+            this.iteration = iteration;
+            this.latch1 = latch1;
+            this.latch2 = latch2;
+        }
+
+        /**
+         * @return Ignite latch.
+         */
+        IgniteCountDownLatch createLatch1() {
+            return latch1;
+        }
+
+        /**
+         * @return Ignite latch.
+         */
+        IgniteCountDownLatch createLatch2() {
+            return latch2;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void run() {
+
+            IgniteCountDownLatch latch1 = createLatch1();
+            IgniteCountDownLatch latch2 = createLatch2();
+
+            IgniteCache<Object, Object> cache = igniteInstance.cache("testCache");
+
+            for (ClusterNode node : igniteInstance.cluster().forServers().nodes()) {
+                Integer val = (Integer)cache.get(String.valueOf(node.id()));
+                assertEquals(val, (Integer)iteration);
+            }
+
+            latch1.countDown();
+
+            assertTrue(latch1.await(10000));
+
+            cache.put(getUID(), (iteration + 1));
+
+            latch2.countDown();
+
+        }
+
+        /**
+         * @return Node UUID as string.
+         */
+        String getUID() {
+            String id = "";
+            Collection<ClusterNode> nodes = igniteInstance.cluster().forLocal().nodes();
+            for (ClusterNode node : nodes) {
+                if (node.isLocal())
+                    id = String.valueOf(node.id());
+            }
+            return id;
+        }
+
+        /**
+         * @return Ignite.
+         */
+        public Ignite igniteInstance() {
+            return igniteInstance;
+        }
+    }
 }