You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/18 04:29:36 UTC

[28/50] [abbrv] ignite git commit: IGNITE-1400 On node stop prevent exchange worker hang on topology lock

IGNITE-1400 On node stop prevent exchange worker hang on topology lock


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a81cce72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a81cce72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a81cce72

Branch: refs/heads/ignite-843
Commit: a81cce7214c966de0281ef82da0b1fe042842911
Parents: 367d805
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 15:46:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 15:49:42 2015 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        | 104 +++++++++++--------
 .../dht/GridDhtPartitionTopology.java           |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   7 +-
 .../ignite/internal/util/IgniteUtils.java       |  16 +++
 4 files changed, 83 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 21a7b3b..5e3cc0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -110,7 +111,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
         log = cctx.logger(getClass());
 
-        beforeExchange(exchFut);
+        lock.writeLock().lock();
+
+        try {
+            beforeExchange0(cctx.localNode(), exchFut);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
     }
 
     /**
@@ -154,8 +162,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         GridDhtPartitionsExchangeFuture exchFut,
         long updSeq,
         boolean stopping
-    ) {
-        lock.writeLock().lock();
+    ) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
 
         try {
             assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -208,67 +216,75 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         ClusterNode loc = cctx.localNode();
 
-        lock.writeLock().lock();
+        U.writeLock(lock);
 
         try {
             if (stopping)
                 return;
 
-            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+            beforeExchange0(loc, exchFut);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
 
-            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
+    /**
+     * @param loc Local node.
+     * @param exchFut Exchange future.
+     */
+    private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
+        GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            if (!exchId.isJoined())
-                removeNode(exchId.nodeId());
+        assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+            topVer + ", exchId=" + exchId + ']';
 
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        if (!exchId.isJoined())
+            removeNode(exchId.nodeId());
 
-            assert oldest != null;
+        // In case if node joins, get topology at the time of joining node.
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+        assert oldest != null;
 
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+        if (log.isDebugEnabled())
+            log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on oldest node [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+        long updateSeq = this.updateSeq.incrementAndGet();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+        // If this is the oldest node.
+        if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+            if (node2part == null) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Created brand new full topology map on oldest node [exchId=" +
+                        exchId + ", fullMap=" + fullMapString() + ']');
             }
+            else if (!node2part.valid()) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 
-            consistencyCheck();
+                if (log.isDebugEnabled())
+                    log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+                        node2part + ']');
+            }
+            else if (!node2part.nodeId().equals(loc.id())) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
+                if (log.isDebugEnabled())
+                    log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+                        exchId + ", fullMap=" + fullMapString() + ']');
+            }
         }
+
+        consistencyCheck();
+
+        if (log.isDebugEnabled())
+            log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+                fullMapString() + ']');
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d4ea3d6..d642314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -50,13 +51,14 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
         GridDhtPartitionExchangeId exchId,
         GridDhtPartitionsExchangeFuture exchFut,
         long updateSeq,
         boolean stopping
-    );
+    ) throws IgniteInterruptedCheckedException;
 
     /**
      * Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fcb012f..a0c9c88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -205,8 +206,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtPartitionsExchangeFuture exchFut,
         long updSeq,
         boolean stopping
-    ) {
-        lock.writeLock().lock();
+    ) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
 
         try {
             assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -267,7 +268,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         int num = cctx.affinity().partitions();
 
-        lock.writeLock().lock();
+        U.writeLock(lock);
 
         try {
             GridDhtPartitionExchangeId exchId = exchFut.exchangeId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ba918f6..e5090cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -119,6 +119,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.jar.JarFile;
 import java.util.logging.ConsoleHandler;
 import java.util.logging.Handler;
@@ -9286,4 +9287,19 @@ public abstract class IgniteUtils {
 
         return hasShmem;
     }
+
+    /**
+     * @param lock Lock.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public static void writeLock(ReadWriteLock lock) throws IgniteInterruptedCheckedException {
+        try {
+            lock.writeLock().lockInterruptibly();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
 }
\ No newline at end of file