You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/20 12:53:13 UTC
[33/50] ignite git commit: IGNITE-3151: Using IgniteCountDownLatch
sometimes drives to dead lock. Reviewed and merged by Denis Magda.
IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock.
Reviewed and merged by Denis Magda.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/314794bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/314794bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/314794bf
Branch: refs/heads/ignite-3341
Commit: 314794bfc71a04a3c185abdbafe1f81b17cc0ec4
Parents: fb23e00
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 18:43:22 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 18:43:22 2016 +0300
----------------------------------------------------------------------
.../GridCacheCountDownLatchImpl.java | 54 ++++++-
.../IgniteCountDownLatchAbstractSelfTest.java | 156 ++++++++++++++++++-
2 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/314794bf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index c984ab3..5adeb38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -26,7 +26,7 @@ import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
@@ -51,6 +51,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** */
private static final long serialVersionUID = 0L;
+ /** Internal latch is in unitialized state. */
+ private static final int UNINITIALIZED_LATCH_STATE = 0;
+
+ /** Internal latch is being created. */
+ private static final int CREATING_LATCH_STATE = 1;
+
+ /** Internal latch is ready for the usage. */
+ private static final int READY_LATCH_STATE = 2;
+
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
@@ -84,14 +93,17 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private boolean autoDel;
/** Internal latch (transient). */
- private volatile CountDownLatch internalLatch;
+ private CountDownLatch internalLatch;
/** Initialization guard. */
- private final AtomicBoolean initGuard = new AtomicBoolean();
+ private final AtomicInteger initGuard = new AtomicInteger();
/** Initialization latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
+ /** Latest latch value that is used at the stage while the internal latch is being initialized. */
+ private Integer lastLatchVal = null;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -237,15 +249,36 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
@Override public void onUpdate(int cnt) {
assert cnt >= 0;
- while (internalLatch != null && internalLatch.getCount() > cnt)
- internalLatch.countDown();
+ CountDownLatch latch0;
+
+ synchronized (initGuard) {
+ int state = initGuard.get();
+
+ if (state != READY_LATCH_STATE) {
+ /** Internal latch is not fully initialized yet. Remember latest latch value. */
+ lastLatchVal = cnt;
+
+ return;
+ }
+
+ /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+ latch0 = internalLatch;
+ }
+
+ /** Internal latch is fully initialized and ready for the usage. */
+
+ assert latch0 != null;
+
+ while (latch0.getCount() > cnt)
+ latch0.countDown();
+
}
/**
* @throws IgniteCheckedException If operation failed.
*/
private void initializeLatch() throws IgniteCheckedException {
- if (initGuard.compareAndSet(false, true)) {
+ if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
try {
internalLatch = CU.outTx(
retryTopologySafe(new Callable<CountDownLatch>() {
@@ -269,6 +302,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
ctx
);
+ synchronized (initGuard) {
+ if (lastLatchVal != null) {
+ while (internalLatch.getCount() > lastLatchVal)
+ internalLatch.countDown();
+ }
+
+ initGuard.set(READY_LATCH_STATE);
+ }
+
if (log.isDebugEnabled())
log.debug("Initialized internal latch: " + internalLatch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/314794bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 2f6f6f4..f6d0287 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -22,21 +22,26 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestUtils;
@@ -144,7 +149,6 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
/**
* @param latch Latch.
- *
* @throws Exception If failed.
*/
protected void checkRemovedLatch(final IgniteCountDownLatch latch) throws Exception {
@@ -236,8 +240,8 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
* @param latchName Latch name.
* @param cnt Count.
* @param autoDel Auto delete flag.
- * @throws Exception If failed.
* @return New latch.
+ * @throws Exception If failed.
*/
private IgniteCountDownLatch createLatch(String latchName, int cnt, boolean autoDel)
throws Exception {
@@ -334,6 +338,58 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
/**
* @throws Exception If failed.
*/
+ public void testLatchBroadcast() throws Exception {
+ Ignite ignite = grid(0);
+ ClusterGroup srvsGrp = ignite.cluster().forServers();
+
+ int numOfSrvs = srvsGrp.nodes().size();
+
+ ignite.destroyCache("testCache");
+ IgniteCache<Object, Object> cache = ignite.createCache("testCache");
+
+ for (ClusterNode node : srvsGrp.nodes())
+ cache.put(String.valueOf(node.id()), 0);
+
+ for (int i = 0; i < 500; i++) {
+ IgniteCountDownLatch latch1 = createLatch1(ignite, numOfSrvs);
+ IgniteCountDownLatch latch2 = createLatch2(ignite, numOfSrvs);
+
+ ignite.compute(srvsGrp).broadcast(new IgniteRunnableJob(latch1, latch2, i));
+ assertTrue(latch2.await(10000));
+ }
+ }
+
+ /**
+ * @param client Ignite client.
+ * @param numOfSrvs Number of server nodes.
+ * @return Ignite latch.
+ */
+ private IgniteCountDownLatch createLatch1(Ignite client, int numOfSrvs) {
+ return client.countDownLatch(
+ "testName1", // Latch name.
+ numOfSrvs, // Initial count.
+ true, // Auto remove, when counter has reached zero.
+ true // Create if it does not exist.
+ );
+ }
+
+ /**
+ * @param client Ignite client.
+ * @param numOfSrvs Number of server nodes.
+ * @return Ignite latch.
+ */
+ private IgniteCountDownLatch createLatch2(Ignite client, int numOfSrvs) {
+ return client.countDownLatch(
+ "testName2", // Latch name.
+ numOfSrvs, // Initial count.
+ true, // Auto remove, when counter has reached zero.
+ true // Create if it does not exist.
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testLatchMultinode2() throws Exception {
if (gridCount() == 1)
return;
@@ -391,4 +447,100 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// No-op.
}
+
+ /**
+ * Ignite job
+ */
+ public class IgniteRunnableJob implements IgniteRunnable {
+
+ /**
+ * Ignite.
+ */
+ @IgniteInstanceResource
+ Ignite igniteInstance;
+
+ /**
+ * Number of iteration.
+ */
+ protected final int iteration;
+
+ /**
+ * Ignite latch 1.
+ */
+ private final IgniteCountDownLatch latch1;
+
+ /**
+ * Ignite latch 2.
+ */
+ private final IgniteCountDownLatch latch2;
+
+ /**
+ * @param latch1 Ignite latch 1.
+ * @param latch2 Ignite latch 2.
+ * @param iteration Number of iteration.
+ */
+ public IgniteRunnableJob(IgniteCountDownLatch latch1, IgniteCountDownLatch latch2, int iteration) {
+ this.iteration = iteration;
+ this.latch1 = latch1;
+ this.latch2 = latch2;
+ }
+
+ /**
+ * @return Ignite latch.
+ */
+ IgniteCountDownLatch createLatch1() {
+ return latch1;
+ }
+
+ /**
+ * @return Ignite latch.
+ */
+ IgniteCountDownLatch createLatch2() {
+ return latch2;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+
+ IgniteCountDownLatch latch1 = createLatch1();
+ IgniteCountDownLatch latch2 = createLatch2();
+
+ IgniteCache<Object, Object> cache = igniteInstance.cache("testCache");
+
+ for (ClusterNode node : igniteInstance.cluster().forServers().nodes()) {
+ Integer val = (Integer)cache.get(String.valueOf(node.id()));
+ assertEquals(val, (Integer)iteration);
+ }
+
+ latch1.countDown();
+
+ assertTrue(latch1.await(10000));
+
+ cache.put(getUID(), (iteration + 1));
+
+ latch2.countDown();
+
+ }
+
+ /**
+ * @return Node UUID as string.
+ */
+ String getUID() {
+ String id = "";
+ Collection<ClusterNode> nodes = igniteInstance.cluster().forLocal().nodes();
+ for (ClusterNode node : nodes) {
+ if (node.isLocal())
+ id = String.valueOf(node.id());
+ }
+ return id;
+ }
+
+ /**
+ * @return Ignite.
+ */
+ public Ignite igniteInstance() {
+ return igniteInstance;
+ }
+ }
}