You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/21 13:02:23 UTC
[07/15] ignite git commit: ignite-801 and ignite-1911: resurrecting
data structure and atomics failover tests + stopping the node if ring message
worker fails
ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c711484c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c711484c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c711484c
Branch: refs/heads/ignite-1282
Commit: c711484c30315c06ce0b31a8775bfc41b7ee1483
Parents: 8e7e330
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Nov 20 19:11:07 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Nov 20 19:11:07 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 39 +-
.../CacheDataStructuresManager.java | 2 +-
.../GridFutureRemapTimeoutObject.java | 72 --
.../dht/GridPartitionedGetFuture.java | 28 +-
.../distributed/near/GridNearGetFuture.java | 28 +-
.../IgniteTxImplicitSingleStateImpl.java | 29 +-
.../IgniteTxRemoteSingleStateImpl.java | 19 +-
.../datastructures/DataStructuresProcessor.java | 47 +-
.../GridAtomicCacheQueueImpl.java | 126 +--
.../GridCacheAtomicReferenceImpl.java | 10 +-
.../GridCacheCountDownLatchImpl.java | 15 +-
.../datastructures/GridCacheQueueAdapter.java | 32 +-
.../GridTransactionalCacheQueueImpl.java | 193 ++--
.../ignite/spi/discovery/DiscoverySpi.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 36 +
...eAbstractDataStructuresFailoverSelfTest.java | 924 +++++++++----------
...rtitionedDataStructuresFailoverSelfTest.java | 7 +-
...edOffheapDataStructuresFailoverSelfTest.java | 12 +-
...eplicatedDataStructuresFailoverSelfTest.java | 5 -
...gniteAtomicLongChangingTopologySelfTest.java | 2 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 4 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 4 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 90 +-
.../testframework/junits/GridAbstractTest.java | 6 +-
24 files changed, 796 insertions(+), 936 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index f7d115f..89779d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -1780,28 +1782,41 @@ public class GridCacheUtils {
public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
return new Callable<S>() {
@Override public S call() throws Exception {
- int retries = GridCacheAdapter.MAX_RETRIES;
-
IgniteCheckedException err = null;
- for (int i = 0; i < retries; i++) {
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
try {
return c.call();
}
+ catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ throw e;
+ }
+ catch (TransactionRollbackException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ U.sleep(1);
+ }
catch (IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class) ||
- X.hasCause(e, IgniteTxRollbackCheckedException.class) ||
- X.hasCause(e, CachePartialUpdateCheckedException.class)) {
- if (i < retries - 1) {
- err = e;
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
- U.sleep(1);
+ if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
- continue;
- }
+ if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+ ClusterTopologyServerNotFoundException)
+ throw e;
- throw e;
+ // IGNITE-1948: remove this check when the issue is fixed
+ if (topErr.retryReadyFuture() != null)
+ topErr.retryReadyFuture().get();
+ else
+ U.sleep(1);
}
+ else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+ CachePartialUpdateCheckedException.class))
+ U.sleep(1);
else
throw e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 1ff4575..930921b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -770,4 +770,4 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
return "RemoveSetCallable [setId=" + setId + ']';
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
deleted file mode 100644
index 72fdd4b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
-/**
- * Future remap timeout object.
- */
-public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter {
- /** */
- private final GridFutureAdapter<?> fut;
-
- /** Finished flag. */
- private final AtomicBoolean finished = new AtomicBoolean();
-
- /** Topology version to wait. */
- private final AffinityTopologyVersion topVer;
-
- /** Exception cause. */
- private final IgniteCheckedException e;
-
- /**
- * @param fut Future.
- * @param timeout Timeout.
- * @param topVer Topology version timeout was created on.
- * @param e Exception cause.
- */
- public GridFutureRemapTimeoutObject(
- GridFutureAdapter<?> fut,
- long timeout,
- AffinityTopologyVersion topVer,
- IgniteCheckedException e) {
- super(timeout);
-
- this.fut = fut;
- this.topVer = topVer;
- this.e = e;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (finish()) // Fail the whole get future, else remap happened concurrently.
- fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e));
- }
-
- /**
- * @return Guard against concurrent completion.
- */
- public boolean finish() {
- return finished.compareAndSet(false, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index c3d9836..e3fae22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -644,34 +643,23 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
final AffinityTopologyVersion updTopVer =
new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
cctx.affinity().affinityReadyFuture(updTopVer).listen(
new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
+ try {
+ fut.get();
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridPartitionedGetFuture.this.onDone(e);
- }
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridPartitionedGetFuture.this.onDone(e);
}
}
}
);
-
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index dfaa44e..f1bff61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -851,34 +850,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
final AffinityTopologyVersion updTopVer =
new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
cctx.affinity().affinityReadyFuture(updTopVer).listen(
new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
+ try {
+ fut.get();
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridNearGetFuture.this.onDone(e);
- }
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridNearGetFuture.this.onDone(e);
}
}
}
);
-
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index c75a8f38..3e0231e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -160,8 +163,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
CacheStoreManager store = cacheCtx.store();
- if (store.configured())
- return Collections.singleton(store);
+ if (store.configured()) {
+ HashSet<CacheStoreManager> set = new HashSet<>(3, 0.75f);
+
+ set.add(store);
+
+ return set;
+ }
return null;
}
@@ -192,12 +200,20 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> writeSet() {
- return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ if (entry != null) {
+ HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+ set.add(entry.txKey());
+
+ return set;
+ }
+ else
+ return Collections.<IgniteTxKey>emptySet();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
@@ -207,8 +223,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
- return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
- Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
+ return entry != null ? F.asMap(entry.txKey(), entry) : Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
}
/** {@inheritDoc} */
@@ -223,7 +238,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index 22f04a8..90af517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -62,12 +65,20 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> writeSet() {
- return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ if (entry != null) {
+ HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+ set.add(entry.txKey());
+
+ return set;
+ }
+ else
+ return Collections.<IgniteTxKey>emptySet();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
@@ -77,7 +88,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
- return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
+ return entry != null ? F.asMap(entry.txKey(), entry) :
Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
}
@@ -93,7 +104,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b532d7f..23d64cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -56,14 +56,13 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -532,21 +531,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (dataStructure != null)
return dataStructure;
- if (!create)
- return c.applyx();
-
while (true) {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ try {
+ if (!create)
+ return c.applyx();
- if (err != null)
- throw err;
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
- dataStructure = c.applyx();
+ if (err != null)
+ throw err;
- tx.commit();
+ dataStructure = c.applyx();
+
+ tx.commit();
- return dataStructure;
+ return dataStructure;
+ }
}
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
@@ -1605,27 +1606,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
- int cnt = 0;
-
- while (true) {
- try {
- return call.call();
- }
- catch (ClusterGroupEmptyCheckedException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IgniteTxRollbackCheckedException |
- CachePartialUpdateCheckedException |
- ClusterTopologyCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ return GridCacheUtils.retryTopologySafe(call).call();
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 28f8631..b433887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -23,7 +23,6 @@ import java.util.Map;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,26 +55,9 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
checkRemoved(idx);
- int cnt = 0;
-
GridCacheQueueItemKey key = itemKey(idx);
- while (true) {
- try {
- cache.getAndPut(key, item);
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ cache.getAndPut(key, item);
return true;
}
@@ -98,38 +80,18 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
GridCacheQueueItemKey key = itemKey(idx);
- int cnt = 0;
-
- long stop = 0;
+ T data = (T)cache.getAndRemove(key);
- while (true) {
- try {
- T data = (T)cache.getAndRemove(key);
+ if (data != null)
+ return data;
- if (data != null)
- return data;
+ long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
- if (stop == 0)
- stop = U.currentTimeMillis() + RETRY_TIMEOUT;
+ while (U.currentTimeMillis() < stop) {
+ data = (T)cache.getAndRemove(key);
- while (U.currentTimeMillis() < stop ) {
- data = (T)cache.getAndRemove(key);
-
- if (data != null)
- return data;
- }
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
+ if (data != null)
+ return data;
}
U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']');
@@ -161,24 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
idx++;
}
- int cnt = 0;
-
- while (true) {
- try {
- cache.putAll(putMap);
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ cache.putAll(putMap);
return true;
}
@@ -197,34 +142,14 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
GridCacheQueueItemKey key = itemKey(idx);
- int cnt = 0;
+ if (cache.remove(key))
+ return;
- long stop = 0;
-
- while (true) {
- try {
- if (cache.remove(key))
- return;
+ long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
- if (stop == 0)
- stop = U.currentTimeMillis() + RETRY_TIMEOUT;
-
- while (U.currentTimeMillis() < stop ) {
- if (cache.remove(key))
- return;
- }
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
+ while (U.currentTimeMillis() < stop) {
+ if (cache.remove(key))
+ return;
}
U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']');
@@ -239,21 +164,6 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
@SuppressWarnings("unchecked")
@Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c)
throws IgniteCheckedException {
- int cnt = 0;
-
- while (true) {
- try {
- return (Long)cache.invoke(queueKey, c).get();
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ return (Long)cache.invoke(queueKey, c).get();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index c0c38b2..37cdaea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
/**
* Cache atomic reference implementation.
@@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
* @return Callable for execution in async and sync mode.
*/
private Callable<Boolean> internalSet(final T val) {
- return new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
throw e;
}
}
- };
+ });
}
/**
@@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
*/
private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
final IgniteClosure<T, T> newValClos) {
- return new Callable<Boolean>() {
+
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
throw e;
}
}
- };
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 2667938..c984ab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -342,20 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private class GetCountCallable implements Callable<Integer> {
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
- Integer val;
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
- if (latchVal == null)
- return 0;
-
- val = latchVal.get();
-
- tx.rollback();
- }
-
- return val;
+ return latchVal == null ? 0 : latchVal.get();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 0e4aebc..df1bd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
/** */
- protected static final int MAX_UPDATE_RETRIES = 100;
-
- /** */
protected static final long RETRY_DELAY = 1;
/** */
@@ -169,14 +166,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@SuppressWarnings("unchecked")
@Nullable @Override public T peek() throws IgniteException {
try {
- GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
+ while (true) {
+ GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
- checkRemoved(hdr);
+ checkRemoved(hdr);
- if (hdr.empty())
- return null;
+ if (hdr.empty())
+ return null;
- return (T)cache.get(itemKey(hdr.head()));
+ T val = (T)cache.get(itemKey(hdr.head()));
+
+ if (val == null)
+ // Header might have been polled. Retry.
+ continue;
+
+ return val;
+ }
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -416,8 +421,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
long startIdx,
long endIdx,
int batchSize)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
for (long idx = startIdx; idx < endIdx; idx++) {
@@ -435,8 +439,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
}
/**
- * Checks result of closure modifying queue header, throws {@link IllegalStateException}
- * if queue was removed.
+ * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed.
*
* @param idx Result of closure execution.
*/
@@ -529,7 +532,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
*/
protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException;
-
/**
* @param idx Item index.
* @return Item key.
@@ -1036,7 +1038,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
if (o == null || getClass() != o.getClass())
return false;
- GridCacheQueueAdapter that = (GridCacheQueueAdapter) o;
+ GridCacheQueueAdapter that = (GridCacheQueueAdapter)o;
return id.equals(that.id);
@@ -1051,4 +1053,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@Override public String toString() {
return S.toString(GridCacheQueueAdapter.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index c7750a6..4880324 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(item, "item");
try {
- boolean retVal;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- int cnt = 0;
-
- while (true) {
- try {
try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
@@ -76,75 +72,59 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
tx.commit();
- break;
+ return retVal;
}
}
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public T poll() throws IgniteException {
try {
- int cnt = 0;
-
- T retVal;
-
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+ return retryTopologySafe(new Callable<T>() {
+ @Override public T call() throws Exception {
+ T retVal;
- if (idx != null) {
- checkRemoved(idx);
-
- retVal = (T)cache.getAndRemove(itemKey(idx));
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
- assert retVal != null : idx;
- }
- else
- retVal = null;
+ if (idx != null) {
+ checkRemoved(idx);
- tx.commit();
+ retVal = (T)cache.getAndRemove(itemKey(idx));
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ assert retVal != null : idx;
+ }
+ else
+ retVal = null;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@@ -153,95 +133,78 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(items, "items");
try {
- boolean retVal;
-
- int cnt = 0;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
-
- if (idx != null) {
- checkRemoved(idx);
-
- Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
- for (T item : items) {
- putMap.put(itemKey(idx), item);
+ if (idx != null) {
+ checkRemoved(idx);
- idx++;
- }
+ Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
- cache.putAll(putMap);
+ for (T item : items) {
+ putMap.put(itemKey(idx), item);
- retVal = true;
- }
- else
- retVal = false;
+ idx++;
+ }
- tx.commit();
+ cache.putAll(putMap);
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ retVal = true;
+ }
+ else
+ retVal = false;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException {
try {
- int cnt = 0;
+ retryTopologySafe(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
+ if (idx != null) {
+ checkRemoved(idx);
- if (idx != null) {
- checkRemoved(idx);
+ boolean rmv = cache.remove(itemKey(idx));
- boolean rmv = cache.remove(itemKey(idx));
+ assert rmv : idx;
+ }
- assert rmv : idx;
+ tx.commit();
}
- tx.commit();
-
- break;
+ return null;
}
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ }).call();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 612c1f1..1ea5014 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -164,4 +164,4 @@ public interface DiscoverySpi extends IgniteSpi {
* @throws IllegalStateException If discovery SPI has not started.
*/
public boolean isClientMode() throws IllegalStateException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ae23d0e..ae3c8cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -2152,6 +2153,41 @@ class ServerImpl extends TcpDiscoveryImpl {
initConnectionCheckFrequency();
}
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ super.body();
+ }
+ catch (Throwable e) {
+ if (!spi.isNodeStopping0()) {
+ final Ignite ignite = spi.ignite();
+
+ if (ignite != null) {
+ U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
+ "Stopping the grid in order to prevent cluster wide instability.", e);
+
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ spi.ignite().close();
+
+ U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " +
+ "message worker thread abnormal termination.");
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " +
+ "message worker thread abnormal termination.", e);
+ }
+ }
+ }).start();
+ }
+ }
+
+ // Must be processed by IgniteSpiThread as well.
+ throw e;
+ }
+ }
+
/**
* Initializes connection check frequency. Used only when failure detection timeout is enabled.
*/