You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/10/31 11:35:33 UTC
ignite git commit: IGNITE-6690 DiscoverySpi: Clientmode Ignite should
not fail on handshake. This closes #2923.
Repository: ignite
Updated Branches:
refs/heads/master 87e37975d -> 035dcb7f7
IGNITE-6690 DiscoverySpi: Clientmode Ignite should not fail on handshake. This closes #2923.
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/035dcb7f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/035dcb7f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/035dcb7f
Branch: refs/heads/master
Commit: 035dcb7f7163384648b2d01134d9e40fbb806bcf
Parents: 87e3797
Author: Alexey Popov <ta...@gmail.com>
Authored: Tue Oct 31 14:33:58 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Oct 31 14:35:29 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 14 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 14 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 22 ++
.../tcp/TcpDiscoveryWithWrongServerTest.java | 332 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
5 files changed, 377 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 139c110..c9a4a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -702,11 +702,17 @@ class ClientImpl extends TcpDiscoveryImpl {
}
if (X.hasCause(e, StreamCorruptedException.class)) {
- if (--sslConnectAttempts == 0)
- throw new IgniteSpiException("Unable to establish plain connection. " +
- "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+ // StreamCorruptedException could be caused by remote node failover
+ if (connectAttempts < 2) {
+ connectAttempts++;
- continue;
+ continue;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Connect failed with StreamCorruptedException, skip address: " + addr);
+
+ break;
}
if (timeoutHelper.checkFailureTimeoutReached(e))
http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 1c3ec2e..57d2faf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1272,11 +1272,17 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (X.hasCause(e, StreamCorruptedException.class)) {
- if (--sslConnectAttempts == 0)
- throw new IgniteException("Unable to establish plain connection. " +
- "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+ // StreamCorruptedException could be caused by remote node failover
+ if (connectAttempts < 2) {
+ connectAttempts++;
- continue;
+ continue;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Connect failed with StreamCorruptedException, skip address: " + addr);
+
+ break;
}
if (timeoutHelper.checkFailureTimeoutReached(e))
http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 922e787..3b83b2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -40,7 +41,9 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.ignite.Ignite;
@@ -265,6 +268,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
+ /** Ssl message pattern for StreamCorruptedException. */
+ private static Pattern sslMsgPattern = Pattern.compile("invalid stream header: 150\\d0\\d00");
+
/** Local address. */
protected String locAddr;
@@ -1616,6 +1622,22 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
"long GC pauses on remote node) [curTimeout=" + timeout +
", rmtAddr=" + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() + ']');
+ StreamCorruptedException streamCorruptedCause = X.cause(e, StreamCorruptedException.class);
+
+ if (streamCorruptedCause != null) {
+ // Lets check StreamCorruptedException for SSL Alert message
+ // Sample SSL Alert message: 15:03:03:00:02:02:0a
+ // 15 = Alert
+ // 03:03 = SSL version
+ // 00:02 = payload length
+ // 02:0a = critical (02) / unexpected message (0a)
+ // So, check message for "invalid stream header: 150X0X00"
+
+ String msg = streamCorruptedCause.getMessage();
+
+ if (msg != null && sslMsgPattern.matcher(msg).matches())
+ streamCorruptedCause.initCause(new SSLException("Detected SSL alert in StreamCorruptedException"));
+ }
throw e;
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
new file mode 100644
index 0000000..768f5f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestThread;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Client-based discovery SPI test with non-Ignite servers.
+ */
+public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest {
+ /** Non-Ignite Server port. */
+ private final static int SERVER_PORT = 47500;
+
+ /** Non-Ignite Server socket. */
+ private ServerSocket srvSock;
+
+ /** Count of accepted connections to non-Ignite Server. */
+ private int connCnt;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+
+ ipFinder.setAddresses(Collections.singleton("127.0.0.1:" + Integer.toString(SERVER_PORT) + ".." +
+ Integer.toString(SERVER_PORT + 2)));
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpiWithOrderedIps().setIpFinder(ipFinder));
+
+ if (igniteInstanceName.startsWith("client"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopTcpThread();
+
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Starts tcp test thread
+ * @param workerFactory one of WorkerFactory
+ */
+ private void startTcpThread(final WorkerFactory workerFactory) {
+ connCnt = 0;
+
+ try {
+ srvSock = new ServerSocket(SERVER_PORT, 10, InetAddress.getByName("127.0.0.1"));
+ }
+ catch (Exception e) {
+ fail("Unexpected TcpServer exception " + e.getMessage());
+ }
+
+ new GridTestThread(new Runnable() {
+ @Override public void run() {
+ try {
+ while(!Thread.currentThread().isInterrupted()) {
+ Socket clientSock = srvSock.accept();
+
+ connCnt++;
+
+ // Create a new thread for socket connection.
+ new GridTestThread(workerFactory.newWorker(clientSock)).start();
+ }
+ }
+ catch (Exception e) {
+ if (!srvSock.isClosed())
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+
+ /**
+ * Stops tcp test thread
+ * @throws IOException IOException
+ */
+ private void stopTcpThread() throws IOException {
+ if (srvSock != null && !srvSock.isClosed())
+ srvSock.close();
+ }
+
+ /**
+ * Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
+ *
+ * @throws Exception in case of error.
+ */
+ public void testWrongHandshakeResponse() throws Exception {
+ startTcpThread(new SomeResponseWorker());
+
+ simpleTest();
+ }
+
+ /**
+ * Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
+ *
+ * @throws Exception in case of error.
+ */
+ public void testNoHandshakeResponse() throws Exception {
+ startTcpThread(new NoResponseWorker());
+
+ simpleTest();
+ }
+
+ /**
+ * Test that Client successfully ignores when server closes sockets after Discovery Handshake Request.
+ *
+ * @throws Exception in case of error.
+ */
+ public void testDisconnectOnRequest() throws Exception {
+ startTcpThread(new DisconnectOnRequestWorker());
+
+ simpleTest();
+ }
+
+ /**
+ * Test that Client successfully ignores when server closes sockets immediately.
+ *
+ * @throws Exception in case of error.
+ */
+ public void testEarlyDisconnect() throws Exception {
+ startTcpThread(new EarlyDisconnectWorker());
+
+ simpleTest();
+ }
+
+ /**
+ * Some simple sanity check with the Server and Client
+ * It is expected that both client and server could successfully perform Discovery Procedure when there is
+ * unknown (test) server in the ipFinder list.
+ */
+ private void simpleTest() {
+ try {
+ Ignite srv = startGrid("server");
+ Ignite client = startGrid("client");
+
+ awaitPartitionMapExchange();
+
+ assertEquals(2, srv.cluster().nodes().size());
+ assertEquals(2, client.cluster().nodes().size());
+ assertTrue(connCnt >= 2);
+
+ srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(1, 1);
+
+ assertEquals(1, client.getOrCreateCache(DEFAULT_CACHE_NAME).get(1));
+ }
+ catch (Exception e) {
+ fail("Failed with unexpected exception: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Just a factory for runnable workers
+ */
+ private interface WorkerFactory {
+ /**
+ * Creates a new worker for socket
+ * @param clientSock socket for worker
+ * @return runnable Worker
+ */
+ Runnable newWorker(Socket clientSock);
+ }
+
+ /**
+ * SocketWorker
+ */
+ private abstract class SocketWorker implements Runnable {
+ /** Client socket. */
+ Socket clientSock;
+
+ /**
+ * @param clientSock Client socket.
+ */
+ SocketWorker(Socket clientSock) {
+ this.clientSock = clientSock;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ InputStream input = clientSock.getInputStream();
+ OutputStream output = clientSock.getOutputStream();
+ byte[] buf = new byte[1024];
+
+ while (!clientSock.isClosed() && input.read(buf) > 0)
+ action(input, output);
+
+ if (!clientSock.isClosed())
+ clientSock.close();
+ }
+ catch (IOException e) {
+ log.error("Unexpected error", e);
+ }
+ }
+
+ /**
+ * @param input socket input stream
+ * @param output socket output stream
+ * @throws IOException IOException
+ */
+ public abstract void action(InputStream input, OutputStream output) throws IOException;
+ }
+
+ /**
+ * SomeResponseWorker.
+ */
+ private class SomeResponseWorker implements WorkerFactory {
+ /** {@inheritDoc} */
+ @Override public Runnable newWorker(Socket clientSock) {
+ return new SocketWorker(clientSock) {
+ @Override public void action(InputStream input, OutputStream output) throws IOException {
+ output.write("Some response".getBytes());
+
+ log.error("TEST: Some response was sent to " + clientSock.getRemoteSocketAddress());
+ }
+ };
+ }
+ }
+
+ /**
+ * NoResponseWorker.
+ */
+ private class NoResponseWorker implements WorkerFactory {
+ /** {@inheritDoc} */
+ @Override public Runnable newWorker(Socket clientSock) {
+ return new SocketWorker(clientSock) {
+ @Override public void action(InputStream input, OutputStream output) throws IOException {
+ log.error("TEST: No response was sent to " + clientSock.getRemoteSocketAddress());
+ }
+ };
+ }
+ }
+
+ /**
+ * DisconnectOnRequestWorker.
+ */
+ private class DisconnectOnRequestWorker implements WorkerFactory {
+ /** {@inheritDoc} */
+ @Override public Runnable newWorker(Socket clientSock) {
+ return new SocketWorker(clientSock) {
+ @Override public void action(InputStream input, OutputStream output) throws IOException {
+ clientSock.close();
+
+ log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
+ }
+ };
+ }
+ }
+
+ /**
+ * EarlyDisconnectWorker.
+ */
+ private class EarlyDisconnectWorker implements WorkerFactory {
+ /** {@inheritDoc} */
+ @Override public Runnable newWorker(Socket clientSock) {
+ return new SocketWorker(clientSock) {
+ @Override public void action(InputStream input, OutputStream output) throws IOException {
+ // No-op
+ }
+
+ @Override public void run() {
+ try {
+ clientSock.close();
+
+ log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
+ }
+ catch (IOException e) {
+ log.error("Unexpected error", e);
+ }
+ }
+ };
+ }
+ }
+
+ /**
+ * TcpDiscoverySpi with non-shuffled resolved IP addresses. We should ensure that in this test non-Ignite server
+ * is the first element of the addresses list
+ */
+ class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+ Collection<InetSocketAddress> shuffled = super.resolvedAddresses();
+ List<InetSocketAddress> res = new ArrayList<>(shuffled);
+
+ Collections.sort(res, new Comparator<InetSocketAddress>() {
+ @Override public int compare(InetSocketAddress o1, InetSocketAddress o2) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ });
+
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ff4c9c1..626875c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -43,6 +43,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest;
@@ -95,6 +96,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
suite.addTest(new TestSuite(AuthenticationRestartTest.class));
+ suite.addTest(new TestSuite(TcpDiscoveryWithWrongServerTest.class));
+
// Client connect.
suite.addTest(new TestSuite(IgniteClientConnectTest.class));
suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));