You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/05 11:08:31 UTC
[ignite] 28/31: GG-18656 attempt to restore connection to failed
client may lead to firing SYSTEM_WORKER_BLOCKED failure in exchange-worker
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-19225
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 5dcff9f6d73483e28aee4da53e293ac5518d713f
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Tue Jun 4 18:19:53 2019 +0300
GG-18656 attempt to restore connection to failed client may lead to firing SYSTEM_WORKER_BLOCKED failure in exchange-worker
(cherry-picked from commit #c15ef97)
---
.../cache/GridCachePartitionExchangeManager.java | 7 ++-
.../spi/communication/tcp/TcpCommunicationSpi.java | 30 +++++++++-
.../ignite/internal/IgniteClientFailuresTest.java | 69 ++++++++++++++++++----
3 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4f5d2fc..46dade4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -178,6 +178,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Stripe id for cluster activation event. */
private static final int CLUSTER_ACTIVATION_EVT_STRIPE_ID = Integer.MAX_VALUE;
+ /** */
+ private static final String EXCHANGE_WORKER_THREAD_NAME = "exchange-worker";
+
/** Atomic reference for pending partition resend timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
@@ -719,7 +722,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else if (reconnect)
reconnectExchangeFut.onDone();
- new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start();
+ new IgniteThread(cctx.igniteInstanceName(), exchWorker.name(), exchWorker).start();
if (reconnect) {
if (fut != null) {
@@ -2753,7 +2756,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Constructor.
*/
private ExchangeWorker() {
- super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log,
+ super(cctx.igniteInstanceName(), EXCHANGE_WORKER_THREAD_NAME, GridCachePartitionExchangeManager.this.log,
cctx.kernalContext().workersRegistry());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2a9fb9a..16b7983 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -66,6 +66,7 @@ import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
@@ -2954,7 +2955,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else
fut = oldFut;
- client = fut.get();
+ WorkersRegistry registry = getWorkersRegistry(ignite);
+
+ long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3
+ : connTimeout / 3;
+
+ long currTimeout = System.currentTimeMillis();
+
+ // This cycle will eventually quit when future is completed by concurrent thread reserving client.
+ while (true) {
+ try {
+ client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ currTimeout += clientReserveWaitTimeout;
+
+ if (log.isDebugEnabled())
+ log.debug("Still waiting for reestablishing connection to node [nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]");
+
+ if (registry != null) {
+ GridWorker wrkr = registry.worker(Thread.currentThread().getName());
+
+ if (wrkr != null)
+ wrkr.updateHeartbeat();
+ }
+ }
+ }
if (client == null) {
if (isLocalNodeDisconnected())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
index 82522ae..f1d12f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -35,7 +36,7 @@ import org.junit.Test;
*/
public class IgniteClientFailuresTest extends GridCommonAbstractTest {
/** */
- private boolean clientMode;
+ private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor=";
/** */
private GridStringLogger inMemoryLog;
@@ -44,13 +45,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setClientMode(clientMode);
-
- if (!clientMode) {
+ if (igniteInstanceName.contains("client"))
+ cfg.setClientMode(true);
+ else {
cfg.setClientFailureDetectionTimeout(10_000);
cfg.setSystemWorkerBlockedTimeout(5_000);
+ cfg.setNetworkTimeout(5_000);
+
cfg.setGridLogger(inMemoryLog);
}
@@ -77,13 +80,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
*/
@Test
public void testNoMessagesFromFailureProcessor() throws Exception {
- inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger());
+ GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+ strLog.logLength(1024 * 1024);
- inMemoryLog.logLength(1024 * 1024);
+ inMemoryLog = strLog;
IgniteEx srv = startGrid(0);
- clientMode = true;
+ inMemoryLog = null;
IgniteEx client00 = startGrid("client00");
@@ -99,7 +104,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
assertTrue(waitRes);
- assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker"));
+ assertFalse(strLog.toString().contains("name=tcp-comm-worker"));
}
/**
@@ -112,12 +117,8 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
public void testFailedClientLeavesTopologyAfterTimeout() throws Exception {
IgniteEx srv0 = startGrid(0);
- clientMode = true;
-
IgniteEx client00 = startGrid("client00");
- Thread.sleep(5_000);
-
client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
breakClient(client00);
@@ -138,6 +139,50 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
assertTrue(waitRes);
}
+ /**
+ * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client
+ * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map)
+ * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED}
+ * because this waiting is finite and part of normal operations.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception {
+ GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+ strLog.logLength(1024 * 1024);
+
+ inMemoryLog = strLog;
+
+ IgniteEx srv0 = startGrid(0);
+
+ inMemoryLog = null;
+
+ IgniteEx client00 = startGrid("client00");
+
+ client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
+
+ startGrid(1);
+
+ breakClient(client00);
+
+ final IgniteClusterEx cl = srv0.cluster();
+
+ assertEquals(3, cl.topology(cl.topologyVersion()).size());
+
+ startGrid("client01");
+
+ boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3),
+ 20_000);
+
+ assertTrue(waitRes);
+
+ String logRes = strLog.toString();
+
+ assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG));
+ }
+
/** */
private void checkCacheOperations(IgniteCache cache) {
for (int i = 0; i < 100; i++)