You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 15:10:20 UTC

[1/4] ignite git commit: IGNITE-1400 On node stop prevent exchange worker hang on topology lock

Repository: ignite
Updated Branches:
  refs/heads/master c1ac19df8 -> 277b76d7a


IGNITE-1400 On node stop prevent exchange worker hang on topology lock


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a81cce72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a81cce72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a81cce72

Branch: refs/heads/master
Commit: a81cce7214c966de0281ef82da0b1fe042842911
Parents: 367d805
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 15:46:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 15:49:42 2015 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        | 104 +++++++++++--------
 .../dht/GridDhtPartitionTopology.java           |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   7 +-
 .../ignite/internal/util/IgniteUtils.java       |  16 +++
 4 files changed, 83 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 21a7b3b..5e3cc0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -110,7 +111,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
         log = cctx.logger(getClass());
 
-        beforeExchange(exchFut);
+        lock.writeLock().lock();
+
+        try {
+            beforeExchange0(cctx.localNode(), exchFut);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
     }
 
     /**
@@ -154,8 +162,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         GridDhtPartitionsExchangeFuture exchFut,
         long updSeq,
         boolean stopping
-    ) {
-        lock.writeLock().lock();
+    ) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
 
         try {
             assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -208,67 +216,75 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         ClusterNode loc = cctx.localNode();
 
-        lock.writeLock().lock();
+        U.writeLock(lock);
 
         try {
             if (stopping)
                 return;
 
-            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+            beforeExchange0(loc, exchFut);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
 
-            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
+    /**
+     * @param loc Local node.
+     * @param exchFut Exchange future.
+     */
+    private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
+        GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            if (!exchId.isJoined())
-                removeNode(exchId.nodeId());
+        assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+            topVer + ", exchId=" + exchId + ']';
 
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        if (!exchId.isJoined())
+            removeNode(exchId.nodeId());
 
-            assert oldest != null;
+        // In case if node joins, get topology at the time of joining node.
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+        assert oldest != null;
 
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+        if (log.isDebugEnabled())
+            log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on oldest node [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+        long updateSeq = this.updateSeq.incrementAndGet();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+        // If this is the oldest node.
+        if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+            if (node2part == null) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Created brand new full topology map on oldest node [exchId=" +
+                        exchId + ", fullMap=" + fullMapString() + ']');
             }
+            else if (!node2part.valid()) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 
-            consistencyCheck();
+                if (log.isDebugEnabled())
+                    log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+                        node2part + ']');
+            }
+            else if (!node2part.nodeId().equals(loc.id())) {
+                node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
+                if (log.isDebugEnabled())
+                    log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+                        exchId + ", fullMap=" + fullMapString() + ']');
+            }
         }
+
+        consistencyCheck();
+
+        if (log.isDebugEnabled())
+            log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+                fullMapString() + ']');
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d4ea3d6..d642314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -50,13 +51,14 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
         GridDhtPartitionExchangeId exchId,
         GridDhtPartitionsExchangeFuture exchFut,
         long updateSeq,
         boolean stopping
-    );
+    ) throws IgniteInterruptedCheckedException;
 
     /**
      * Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fcb012f..a0c9c88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -205,8 +206,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtPartitionsExchangeFuture exchFut,
         long updSeq,
         boolean stopping
-    ) {
-        lock.writeLock().lock();
+    ) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
 
         try {
             assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -267,7 +268,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         int num = cctx.affinity().partitions();
 
-        lock.writeLock().lock();
+        U.writeLock(lock);
 
         try {
             GridDhtPartitionExchangeId exchId = exchFut.exchangeId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ba918f6..e5090cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -119,6 +119,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.jar.JarFile;
 import java.util.logging.ConsoleHandler;
 import java.util.logging.Handler;
@@ -9286,4 +9287,19 @@ public abstract class IgniteUtils {
 
         return hasShmem;
     }
+
+    /**
+     * @param lock Lock.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public static void writeLock(ReadWriteLock lock) throws IgniteInterruptedCheckedException {
+        try {
+            lock.writeLock().lockInterruptibly();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
 }
\ No newline at end of file


[4/4] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-1.4'

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1.4'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/277b76d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/277b76d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/277b76d7

Branch: refs/heads/master
Commit: 277b76d7a0c244f7f1ecba018f55f584027fadfa
Parents: c1ac19d 6e19979
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 16:08:12 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 16:08:12 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../dht/GridClientPartitionTopology.java        | 104 +++++++++++--------
 .../dht/GridDhtPartitionTopology.java           |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   7 +-
 .../ignite/internal/util/IgniteUtils.java       |  16 +++
 5 files changed, 87 insertions(+), 54 deletions(-)
----------------------------------------------------------------------



[3/4] ignite git commit: Fixed RendezvousAffinityFunction.hashIdResolver compatibility.

Posted by sb...@apache.org.
Fixed RendezvousAffinityFunction.hashIdResolver compatibility.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e19979b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e19979b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e19979b

Branch: refs/heads/master
Commit: 6e19979b329d266d3be2f4abe2bdbd8dd443d64d
Parents: c257e07
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 15:57:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 15:57:03 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/GridCacheProcessor.java     | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e19979b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4317f70..4ae0baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -232,16 +232,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (cfg.getCacheMode() == PARTITIONED) {
                 RendezvousAffinityFunction aff = new RendezvousAffinityFunction();
 
-                if (internalCache)
-                    aff.setHashIdResolver(new AffinityNodeAddressHashResolver());
+                aff.setHashIdResolver(new AffinityNodeAddressHashResolver());
 
                 cfg.setAffinity(aff);
             }
             else if (cfg.getCacheMode() == REPLICATED) {
                 RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512);
 
-                if (internalCache)
-                    aff.setHashIdResolver(new AffinityNodeAddressHashResolver());
+                aff.setHashIdResolver(new AffinityNodeAddressHashResolver());
 
                 cfg.setAffinity(aff);
 
@@ -251,11 +249,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cfg.setAffinity(new LocalAffinityFunction());
         }
         else {
-            if (cfg.getCacheMode() == PARTITIONED) {
+            if (cfg.getCacheMode() != LOCAL) {
                 if (cfg.getAffinity() instanceof RendezvousAffinityFunction) {
                     RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity();
 
-                    if (internalCache && aff.getHashIdResolver() == null)
+                    if (aff.getHashIdResolver() == null)
                         aff.setHashIdResolver(new AffinityNodeAddressHashResolver());
                 }
             }


[2/4] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c257e070
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c257e070
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c257e070

Branch: refs/heads/master
Commit: c257e070b1fcf29de4ac7192f8873c4e422b5d7e
Parents: a81cce7 2aa292c
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 15:55:17 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 15:55:17 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsFileWorkerBatch.java    | 15 ++++-
 .../processors/igfs/IgfsAbstractSelfTest.java   | 60 +++++++++-----------
 .../igfs/IgfsDualAbstractSelfTest.java          | 10 +++-
 .../testsuites/IgniteHadoopTestSuite.java       |  6 +-
 4 files changed, 52 insertions(+), 39 deletions(-)
----------------------------------------------------------------------