You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/10/22 15:20:20 UTC
ignite git commit: IGNITE-9738 Client node can suddenly fail on start
- Fixes #4968.
Repository: ignite
Updated Branches:
refs/heads/master e1f8f46f9 -> d82b21ec5
IGNITE-9738 Client node can suddenly fail on start - Fixes #4968.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d82b21ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d82b21ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d82b21ec
Branch: refs/heads/master
Commit: d82b21ec56a956fa7cc5374e3f15e279e7c492ac
Parents: e1f8f46
Author: vd-pyatkov <vp...@gridgain.com>
Authored: Mon Oct 22 18:19:53 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Oct 22 18:19:53 2018 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 10 +-
.../LongClientConnectToClusterTest.java | 173 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
3 files changed, 180 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d3a8b18..92c197a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -305,6 +305,11 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}.start();
+ timer.schedule(
+ new MetricsSender(),
+ spi.metricsUpdateFreq,
+ spi.metricsUpdateFreq);
+
try {
joinLatch.await();
@@ -317,11 +322,6 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
- timer.schedule(
- new MetricsSender(),
- spi.metricsUpdateFreq,
- spi.metricsUpdateFreq);
-
spi.printStartInfo();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
new file mode 100644
index 0000000..a079926
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Test client connects to two nodes cluster during time more than the
+ * {@link org.apache.ignite.configuration.IgniteConfiguration#clientFailureDetectionTimeout}.
+ */
+public class LongClientConnectToClusterTest extends GridCommonAbstractTest {
+ /** Client instance name. */
+ public static final String CLIENT_INSTANCE_NAME = "client";
+ /** Client metrics update count. */
+ private static volatile int clientMetricsUpdateCnt;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ TcpDiscoverySpi discoSpi = getTestIgniteInstanceName(0).equals(igniteInstanceName)
+ ? new DelayedTcpDiscoverySpi()
+ : getTestIgniteInstanceName(1).equals(igniteInstanceName)
+ ? new UpdateMetricsInterceptorTcpDiscoverySpi()
+ : new TcpDiscoverySpi();
+
+ return super.getConfiguration(igniteInstanceName)
+ .setClientMode(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME))
+ .setClientFailureDetectionTimeout(1_000)
+ .setMetricsUpdateFrequency(500)
+ .setDiscoverySpi(discoSpi
+ .setReconnectCount(1)
+ .setLocalAddress("127.0.0.1")
+ .setIpFinder(new TcpDiscoveryVmIpFinder()
+ .setAddresses(Collections.singletonList(igniteInstanceName.startsWith(CLIENT_INSTANCE_NAME)
+ ? "127.0.0.1:47501"
+ : "127.0.0.1:47500..47502"))));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test method.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientConnectToCluster() throws Exception {
+ clientMetricsUpdateCnt = 0;
+
+ IgniteEx client = startGrid(CLIENT_INSTANCE_NAME);
+
+ assertTrue(clientMetricsUpdateCnt > 0);
+
+ assertTrue(client.localNode().isClient());
+
+ assertEquals(client.cluster().nodes().size(), 3);
+ }
+
+ /** Discovery SPI which intercept TcpDiscoveryClientMetricsUpdateMessage. */
+ private static class UpdateMetricsInterceptorTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
+ /** */
+ private DiscoverySpiListener delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> onDiscovery(
+ int type,
+ long topVer,
+ ClusterNode node,
+ Collection<ClusterNode> topSnapshot,
+ @Nullable Map<Long, Collection<ClusterNode>> topHist,
+ @Nullable DiscoverySpiCustomMessage spiCustomMsg
+ ) {
+ if (EventType.EVT_NODE_METRICS_UPDATED == type) {
+ log.info("Metrics update message catched from node " + node);
+
+ assertFalse(locNode.isClient());
+
+ if (node.isClient())
+ clientMetricsUpdateCnt++;
+ }
+
+ if (delegate != null)
+ return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
+
+ return new IgniteFinishedFutureImpl<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onLocalNodeInitialized(ClusterNode locNode) {
+ if (delegate != null)
+ delegate.onLocalNodeInitialized(locNode);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(new DiscoverySpiListenerWrapper(lsnr));
+ }
+ }
+
+ /** Discovery SPI delayed TcpDiscoveryNodeAddFinishedMessage. */
+ private static class DelayedTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** Delay message period millis. */
+ public static final int DELAY_MSG_PERIOD_MILLIS = 2_000;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(ClusterNode node, Socket sock, OutputStream out,
+ TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && msg.topologyVersion() == 3) {
+ log.info("Catched discovery message: " + msg);
+
+ try {
+ Thread.sleep(DELAY_MSG_PERIOD_MILLIS);
+ }
+ catch (InterruptedException e) {
+ log.error("Interrupt on DelayedTcpDiscoverySpi.", e);
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ super.writeToSocket(node, sock, out, msg, timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d82b21ec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 04869f9..80f093d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest;
+import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest;
import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
@@ -97,6 +98,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
+ suite.addTest(new TestSuite(LongClientConnectToClusterTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));