You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/05 11:08:31 UTC

[ignite] 28/31: GG-18656 attempt to restore connection to failed client may lead to firing SYSTEM_WORKER_BLOCKED failure in exchange-worker

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-19225
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 5dcff9f6d73483e28aee4da53e293ac5518d713f
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Tue Jun 4 18:19:53 2019 +0300

    GG-18656 attempt to restore connection to failed client may lead to firing SYSTEM_WORKER_BLOCKED failure in exchange-worker
    
    (cherry-picked from commit #c15ef97)
---
 .../cache/GridCachePartitionExchangeManager.java   |  7 ++-
 .../spi/communication/tcp/TcpCommunicationSpi.java | 30 +++++++++-
 .../ignite/internal/IgniteClientFailuresTest.java  | 69 ++++++++++++++++++----
 3 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4f5d2fc..46dade4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -178,6 +178,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Stripe id for cluster activation event. */
     private static final int CLUSTER_ACTIVATION_EVT_STRIPE_ID = Integer.MAX_VALUE;
 
+    /** */
+    private static final String EXCHANGE_WORKER_THREAD_NAME = "exchange-worker";
+
     /** Atomic reference for pending partition resend timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -719,7 +722,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         else if (reconnect)
             reconnectExchangeFut.onDone();
 
-        new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start();
+        new IgniteThread(cctx.igniteInstanceName(), exchWorker.name(), exchWorker).start();
 
         if (reconnect) {
             if (fut != null) {
@@ -2753,7 +2756,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
          * Constructor.
          */
         private ExchangeWorker() {
-            super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log,
+            super(cctx.igniteInstanceName(), EXCHANGE_WORKER_THREAD_NAME, GridCachePartitionExchangeManager.this.log,
                 cctx.kernalContext().workersRegistry());
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2a9fb9a..16b7983 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -66,6 +66,7 @@ import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
@@ -2954,7 +2955,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 else
                     fut = oldFut;
 
-                client = fut.get();
+                WorkersRegistry registry = getWorkersRegistry(ignite);
+
+                long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3
+                    : connTimeout / 3;
+
+                long currTimeout = System.currentTimeMillis();
+
+                // This cycle will eventually quit when future is completed by concurrent thread reserving client.
+                while (true) {
+                    try {
+                        client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        currTimeout += clientReserveWaitTimeout;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Still waiting for reestablishing connection to node [nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]");
+
+                        if (registry != null) {
+                            GridWorker wrkr = registry.worker(Thread.currentThread().getName());
+
+                            if (wrkr != null)
+                                wrkr.updateHeartbeat();
+                        }
+                    }
+                }
 
                 if (client == null) {
                     if (isLocalNodeDisconnected())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
index 82522ae..f1d12f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.internal;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -35,7 +36,7 @@ import org.junit.Test;
  */
 public class IgniteClientFailuresTest extends GridCommonAbstractTest {
     /** */
-    private boolean clientMode;
+    private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor=";
 
     /** */
     private GridStringLogger inMemoryLog;
@@ -44,13 +45,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setClientMode(clientMode);
-
-        if (!clientMode) {
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+        else {
             cfg.setClientFailureDetectionTimeout(10_000);
 
             cfg.setSystemWorkerBlockedTimeout(5_000);
 
+            cfg.setNetworkTimeout(5_000);
+
             cfg.setGridLogger(inMemoryLog);
         }
 
@@ -77,13 +80,15 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
      */
     @Test
     public void testNoMessagesFromFailureProcessor() throws Exception {
-        inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger());
+        GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+        strLog.logLength(1024 * 1024);
 
-        inMemoryLog.logLength(1024 * 1024);
+        inMemoryLog = strLog;
 
         IgniteEx srv = startGrid(0);
 
-        clientMode = true;
+        inMemoryLog = null;
 
         IgniteEx client00 = startGrid("client00");
 
@@ -99,7 +104,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
 
         assertTrue(waitRes);
 
-        assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker"));
+        assertFalse(strLog.toString().contains("name=tcp-comm-worker"));
     }
 
     /**
@@ -112,12 +117,8 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
     public void testFailedClientLeavesTopologyAfterTimeout() throws Exception {
         IgniteEx srv0 = startGrid(0);
 
-        clientMode = true;
-
         IgniteEx client00 = startGrid("client00");
 
-        Thread.sleep(5_000);
-
         client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
 
         breakClient(client00);
@@ -138,6 +139,50 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest {
         assertTrue(waitRes);
     }
 
+    /**
+     * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client
+     * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map)
+     * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED}
+     * because this waiting is finite and part of normal operations.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception {
+        GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger());
+
+        strLog.logLength(1024 * 1024);
+
+        inMemoryLog = strLog;
+
+        IgniteEx srv0 = startGrid(0);
+
+        inMemoryLog = null;
+
+        IgniteEx client00 = startGrid("client00");
+
+        client00.getOrCreateCache(new CacheConfiguration<>("cache0"));
+
+        startGrid(1);
+
+        breakClient(client00);
+
+        final IgniteClusterEx cl = srv0.cluster();
+
+        assertEquals(3, cl.topology(cl.topologyVersion()).size());
+
+        startGrid("client01");
+
+        boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3),
+            20_000);
+
+        assertTrue(waitRes);
+
+        String logRes = strLog.toString();
+
+        assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG));
+    }
+
     /** */
     private void checkCacheOperations(IgniteCache cache) {
         for (int i = 0; i < 100; i++)