You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/22 15:20:20 UTC

ignite git commit: IGNITE-9738 Client node can suddenly fail on start - Fixes #4968.

Repository: ignite
Updated Branches:
  refs/heads/master e1f8f46f9 -> d82b21ec5


IGNITE-9738 Client node can suddenly fail on start - Fixes #4968.

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d82b21ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d82b21ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d82b21ec

Branch: refs/heads/master
Commit: d82b21ec56a956fa7cc5374e3f15e279e7c492ac
Parents: e1f8f46
Author: vd-pyatkov <vp...@gridgain.com>
Authored: Mon Oct 22 18:19:53 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Oct 22 18:19:53 2018 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  10 +-
 .../LongClientConnectToClusterTest.java         | 173 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 3 files changed, 180 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d3a8b18..92c197a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -305,6 +305,11 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
         }.start();
 
+        timer.schedule(
+            new MetricsSender(),
+            spi.metricsUpdateFreq,
+            spi.metricsUpdateFreq);
+
         try {
             joinLatch.await();
 
@@ -317,11 +322,6 @@ class ClientImpl extends TcpDiscoveryImpl {
             throw new IgniteSpiException("Thread has been interrupted.", e);
         }
 
-        timer.schedule(
-            new MetricsSender(),
-            spi.metricsUpdateFreq,
-            spi.metricsUpdateFreq);
-
         spi.printStartInfo();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
new file mode 100644
index 0000000..a079926
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Test client connects to two nodes cluster during time more than the
+ * {@link org.apache.ignite.configuration.IgniteConfiguration#clientFailureDetectionTimeout}.
+ */
+public class LongClientConnectToClusterTest extends GridCommonAbstractTest {
+    /** Client instance name. */
+    public static final String CLIENT_INSTANCE_NAME = "client";
+    /** Client metrics update count. */
+    private static volatile int clientMetricsUpdateCnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        TcpDiscoverySpi discoSpi = getTestIgniteInstanceName(0).equals(igniteInstanceName)
+            ? new DelayedTcpDiscoverySpi()
+            : getTestIgniteInstanceName(1).equals(igniteInstanceName)
+            ? new UpdateMetricsInterceptorTcpDiscoverySpi()
+            : new TcpDiscoverySpi();
+
+        return super.getConfiguration(igniteInstanceName)
+            .setClientMode(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME))
+            .setClientFailureDetectionTimeout(1_000)
+            .setMetricsUpdateFrequency(500)
+            .setDiscoverySpi(discoSpi
+                .setReconnectCount(1)
+                .setLocalAddress("127.0.0.1")
+                .setIpFinder(new TcpDiscoveryVmIpFinder()
+                    .setAddresses(Collections.singletonList(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME)
+                        ? "127.0.0.1:47501"
+                        : "127.0.0.1:47500..47502"))));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test method.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientConnectToCluster() throws Exception {
+        clientMetricsUpdateCnt = 0;
+
+        IgniteEx client = startGrid(CLIENT_INSTANCE_NAME);
+
+        assertTrue(clientMetricsUpdateCnt > 0);
+
+        assertTrue(client.localNode().isClient());
+
+        assertEquals(client.cluster().nodes().size(), 3);
+    }
+
+    /** Discovery SPI which intercept TcpDiscoveryClientMetricsUpdateMessage. */
+    private static class UpdateMetricsInterceptorTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
+            /** */
+            private DiscoverySpiListener delegate;
+
+            /**
+             * @param delegate Delegate.
+             */
+            private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) {
+                this.delegate = delegate;
+            }
+
+            /** {@inheritDoc} */
+            @Override public IgniteFuture<?> onDiscovery(
+                int type,
+                long topVer,
+                ClusterNode node,
+                Collection<ClusterNode> topSnapshot,
+                @Nullable Map<Long, Collection<ClusterNode>> topHist,
+                @Nullable DiscoverySpiCustomMessage spiCustomMsg
+            ) {
+                if (EventType.EVT_NODE_METRICS_UPDATED == type) {
+                    log.info("Metrics update message catched from node " + node);
+
+                    assertFalse(locNode.isClient());
+
+                    if (node.isClient())
+                        clientMetricsUpdateCnt++;
+                }
+
+                if (delegate != null)
+                    return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
+
+                return new IgniteFinishedFutureImpl<>();
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onLocalNodeInitialized(ClusterNode locNode) {
+                if (delegate != null)
+                    delegate.onLocalNodeInitialized(locNode);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+            super.setListener(new DiscoverySpiListenerWrapper(lsnr));
+        }
+    }
+
+    /** Discovery SPI delayed TcpDiscoveryNodeAddFinishedMessage. */
+    private static class DelayedTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** Delay message period millis. */
+        public static final int DELAY_MSG_PERIOD_MILLIS = 2_000;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(ClusterNode node, Socket sock, OutputStream out,
+            TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && msg.topologyVersion() == 3) {
+                log.info("Catched discovery message: " + msg);
+
+                try {
+                    Thread.sleep(DELAY_MSG_PERIOD_MILLIS);
+                }
+                catch (InterruptedException e) {
+                    log.error("Interrupt on DelayedTcpDiscoverySpi.", e);
+
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            super.writeToSocket(node, sock, out, msg, timeout);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 04869f9..80f093d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
 import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
 import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
 import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest;
+import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest;
 import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
@@ -97,6 +98,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class));
 
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
+        suite.addTest(new TestSuite(LongClientConnectToClusterTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));