You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 19:26:14 UTC
[42/50] [abbrv] ignite git commit: IGNITE-1400 On node stop prevent
exchange worker hang on topology lock
IGNITE-1400 On node stop prevent exchange worker hang on topology lock
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a81cce72
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a81cce72
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a81cce72
Branch: refs/heads/ignite-1093-2
Commit: a81cce7214c966de0281ef82da0b1fe042842911
Parents: 367d805
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 15:46:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 15:49:42 2015 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 104 +++++++++++--------
.../dht/GridDhtPartitionTopology.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 7 +-
.../ignite/internal/util/IgniteUtils.java | 16 +++
4 files changed, 83 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 21a7b3b..5e3cc0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -110,7 +111,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log = cctx.logger(getClass());
- beforeExchange(exchFut);
+ lock.writeLock().lock();
+
+ try {
+ beforeExchange0(cctx.localNode(), exchFut);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
}
/**
@@ -154,8 +162,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
GridDhtPartitionsExchangeFuture exchFut,
long updSeq,
boolean stopping
- ) {
- lock.writeLock().lock();
+ ) throws IgniteInterruptedCheckedException {
+ U.writeLock(lock);
try {
assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -208,67 +216,75 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
ClusterNode loc = cctx.localNode();
- lock.writeLock().lock();
+ U.writeLock(lock);
try {
if (stopping)
return;
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ beforeExchange0(loc, exchFut);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
- topVer + ", exchId=" + exchId + ']';
+ /**
+ * @param loc Local node.
+ * @param exchFut Exchange future.
+ */
+ private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
+ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
- if (!exchId.isJoined())
- removeNode(exchId.nodeId());
+ assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+ topVer + ", exchId=" + exchId + ']';
- // In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ if (!exchId.isJoined())
+ removeNode(exchId.nodeId());
- assert oldest != null;
+ // In case if node joins, get topology at the time of joining node.
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
- if (log.isDebugEnabled())
- log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+ assert oldest != null;
- long updateSeq = this.updateSeq.incrementAndGet();
-
- // If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
- if (node2part == null) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+ if (log.isDebugEnabled())
+ log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
- if (log.isDebugEnabled())
- log.debug("Created brand new full topology map on oldest node [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
- }
- else if (!node2part.valid()) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ long updateSeq = this.updateSeq.incrementAndGet();
- if (log.isDebugEnabled())
- log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
- node2part + ']');
- }
- else if (!node2part.nodeId().equals(loc.id())) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ // If this is the oldest node.
+ if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+ if (node2part == null) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
- if (log.isDebugEnabled())
- log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
- }
+ if (log.isDebugEnabled())
+ log.debug("Created brand new full topology map on oldest node [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
}
+ else if (!node2part.valid()) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- consistencyCheck();
+ if (log.isDebugEnabled())
+ log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+ node2part + ']');
+ }
+ else if (!node2part.nodeId().equals(loc.id())) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
- fullMapString() + ']');
- }
- finally {
- lock.writeLock().unlock();
+ if (log.isDebugEnabled())
+ log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
}
+
+ consistencyCheck();
+
+ if (log.isDebugEnabled())
+ log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+ fullMapString() + ']');
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d4ea3d6..d642314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -50,13 +51,14 @@ public interface GridDhtPartitionTopology {
*
* @param exchId Exchange ID.
* @param exchFut Exchange future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
*/
public void updateTopologyVersion(
GridDhtPartitionExchangeId exchId,
GridDhtPartitionsExchangeFuture exchFut,
long updateSeq,
boolean stopping
- );
+ ) throws IgniteInterruptedCheckedException;
/**
* Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fcb012f..a0c9c88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -205,8 +206,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionsExchangeFuture exchFut,
long updSeq,
boolean stopping
- ) {
- lock.writeLock().lock();
+ ) throws IgniteInterruptedCheckedException {
+ U.writeLock(lock);
try {
assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
@@ -267,7 +268,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
- lock.writeLock().lock();
+ U.writeLock(lock);
try {
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a81cce72/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ba918f6..e5090cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -119,6 +119,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.jar.JarFile;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
@@ -9286,4 +9287,19 @@ public abstract class IgniteUtils {
return hasShmem;
}
+
+ /**
+ * @param lock Lock.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ public static void writeLock(ReadWriteLock lock) throws IgniteInterruptedCheckedException {
+ try {
+ lock.writeLock().lockInterruptibly();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
}
\ No newline at end of file