You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/05 09:45:59 UTC

ignite git commit: IGNITE-9675 Fixed deadlock on Ignite#active() and concurrent node stop - Fixes #4822.

Repository: ignite
Updated Branches:
  refs/heads/master 78c2d3bbb -> 754c7337d


IGNITE-9675 Fixed deadlock on Ignite#active() and concurrent node stop - Fixes #4822.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/754c7337
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/754c7337
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/754c7337

Branch: refs/heads/master
Commit: 754c7337de123ac44e2816d2a55ab6f76cd03eac
Parents: 78c2d3b
Author: Alexey Platonov <ap...@gmail.com>
Authored: Fri Oct 5 11:57:07 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Oct 5 12:45:45 2018 +0300

----------------------------------------------------------------------
 .../cluster/GridClusterStateProcessor.java      | 50 ++++++++++++--------
 .../cluster/IGridClusterStateProcessor.java     |  6 +++
 .../processors/task/GridTaskProcessor.java      | 25 +++++++++-
 .../processors/igfs/IgfsIgniteMock.java         |  8 ++--
 4 files changed, 64 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index c4a3126..2b70998 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -59,8 +59,11 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadW
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -173,6 +176,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
     /** {@inheritDoc} */
     @Override public boolean publicApiActiveState(boolean waitForTransition) {
+        return publicApiActiveStateAsync(waitForTransition).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> publicApiActiveStateAsync(boolean asyncWaitForTransition) {
         if (ctx.isDaemon())
             return sendComputeCheckGlobalState();
 
@@ -184,32 +192,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             Boolean transitionRes = globalState.transitionResult();
 
             if (transitionRes != null)
-                return transitionRes;
+                return new IgniteFinishedFutureImpl<>(transitionRes);
             else {
-                if (waitForTransition) {
-                    GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId());
+                GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId());
+                if (fut != null) {
+                    if (asyncWaitForTransition) {
+                         return new IgniteFutureImpl<>(fut.chain(new C1<IgniteInternalFuture<Void>, Boolean>() {
+                            @Override public Boolean apply(IgniteInternalFuture<Void> fut) {
+                                Boolean res = globalState.transitionResult();
 
-                    if (fut != null) {
-                        try {
-                            fut.get();
-                        }
-                        catch (IgniteCheckedException ex) {
-                            throw new IgniteException(ex);
-                        }
+                                assert res != null;
+
+                                return res;
+                            }
+                        }));
                     }
+                    else
+                        return new IgniteFinishedFutureImpl<>(false);
+                }
 
-                    transitionRes = globalState.transitionResult();
+                transitionRes = globalState.transitionResult();
 
-                    assert transitionRes != null;
+                assert transitionRes != null;
 
-                    return transitionRes;
-                }
-                else
-                    return false;
+                return new IgniteFinishedFutureImpl<>(transitionRes);
             }
         }
         else
-            return globalState.active();
+            return new IgniteFinishedFutureImpl<>(globalState.active());
     }
 
     /** {@inheritDoc} */
@@ -1066,7 +1076,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
      *
      *  @return Cluster state, {@code True} if cluster active, {@code False} if inactive.
      */
-    private boolean sendComputeCheckGlobalState() {
+    private IgniteFuture<Boolean> sendComputeCheckGlobalState() {
         AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
         if (log.isInfoEnabled()) {
@@ -1079,11 +1089,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers();
 
         if (F.isEmpty(clusterGroupAdapter.nodes()))
-            return false;
+            return new IgniteFinishedFutureImpl<>(false);
 
         IgniteCompute comp = clusterGroupAdapter.compute();
 
-        return comp.call(new IgniteCallable<Boolean>() {
+        return comp.callAsync(new IgniteCallable<Boolean>() {
             @IgniteInstanceResource
             private Ignite ig;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
index bc72a51..d71b4cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
+import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,6 +42,11 @@ public interface IGridClusterStateProcessor extends GridProcessor {
     boolean publicApiActiveState(boolean waitForTransition);
 
     /**
+     * @return Cluster state to be used on public API.
+     */
+    IgniteFuture<Boolean> publicApiActiveStateAsync(boolean waitForTransition);
+
+    /**
      * @param discoCache Discovery data cache.
      * @return If transition is in progress returns future which is completed when transition finishes.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9007472..313f6c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -69,6 +71,7 @@ import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -187,7 +190,24 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
     /** {@inheritDoc} */
     @SuppressWarnings("TooBroadScope")
     @Override public void onKernalStop(boolean cancel) {
-        lock.writeLock();
+        boolean interrupted = false;
+
+        while (true) {
+            try {
+                if (lock.tryWriteLock(1, TimeUnit.SECONDS))
+                    break;
+                else {
+                    LT.warn(log, "Still waiting to acquire write lock on stop");
+
+                    U.sleep(50);
+                }
+            }
+            catch (IgniteInterruptedCheckedException | InterruptedException e) {
+                LT.warn(log, "Stopping thread was interrupted while waiting for write lock (will wait anyway)");
+
+                interrupted = true;
+            }
+        }
 
         try {
             stopping = true;
@@ -196,6 +216,9 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
         }
         finally {
             lock.writeUnlock();
+
+            if (interrupted)
+                Thread.currentThread().interrupt();
         }
 
         startLatch.countDown();

http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index a0ce285..5b25116 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -182,13 +182,13 @@ public class IgfsIgniteMock implements IgniteEx {
         return null;
     }
 
-    @Override
-    public boolean isRebalanceEnabled() {
+    /** {@inheritDoc} */
+    @Override public boolean isRebalanceEnabled() {
         return true;
     }
 
-    @Override
-    public void rebalanceEnabled(boolean rebalanceEnabled) {
+    /** {@inheritDoc} */
+    @Override public void rebalanceEnabled(boolean rebalanceEnabled) {
         throwUnsupported();
     }