You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/10/31 11:35:33 UTC

ignite git commit: IGNITE-6690 DiscoverySpi: Clientmode Ignite should not fail on handshake. This closes #2923.

Repository: ignite
Updated Branches:
  refs/heads/master 87e37975d -> 035dcb7f7


IGNITE-6690 DiscoverySpi: Clientmode Ignite should not fail on handshake. This closes #2923.

Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/035dcb7f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/035dcb7f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/035dcb7f

Branch: refs/heads/master
Commit: 035dcb7f7163384648b2d01134d9e40fbb806bcf
Parents: 87e3797
Author: Alexey Popov <ta...@gmail.com>
Authored: Tue Oct 31 14:33:58 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Oct 31 14:35:29 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  14 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  14 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  22 ++
 .../tcp/TcpDiscoveryWithWrongServerTest.java    | 332 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   3 +
 5 files changed, 377 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 139c110..c9a4a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -702,11 +702,17 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
 
                 if (X.hasCause(e, StreamCorruptedException.class)) {
-                    if (--sslConnectAttempts == 0)
-                        throw new IgniteSpiException("Unable to establish plain connection. " +
-                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+                    // StreamCorruptedException could be caused by remote node failover
+                    if (connectAttempts < 2) {
+                        connectAttempts++;
 
-                    continue;
+                        continue;
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Connect failed with StreamCorruptedException, skip address: " + addr);
+
+                    break;
                 }
 
                 if (timeoutHelper.checkFailureTimeoutReached(e))

http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 1c3ec2e..57d2faf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1272,11 +1272,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 if (X.hasCause(e, StreamCorruptedException.class)) {
-                    if (--sslConnectAttempts == 0)
-                        throw new IgniteException("Unable to establish plain connection. " +
-                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+                    // StreamCorruptedException could be caused by remote node failover
+                    if (connectAttempts < 2) {
+                        connectAttempts++;
 
-                    continue;
+                        continue;
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Connect failed with StreamCorruptedException, skip address: " + addr);
+
+                    break;
                 }
 
                 if (timeoutHelper.checkFailureTimeoutReached(e))

http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 922e787..3b83b2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.StreamCorruptedException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -40,7 +41,9 @@ import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLServerSocketFactory;
 import javax.net.ssl.SSLSocketFactory;
 import org.apache.ignite.Ignite;
@@ -265,6 +268,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
     /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
     public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
 
+    /** Ssl message pattern for StreamCorruptedException. */
+    private static Pattern sslMsgPattern = Pattern.compile("invalid stream header: 150\\d0\\d00");
+
     /** Local address. */
     protected String locAddr;
 
@@ -1616,6 +1622,22 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
                     "long GC pauses on remote node) [curTimeout=" + timeout +
                     ", rmtAddr=" + sock.getRemoteSocketAddress() + ", rmtPort=" + sock.getPort() + ']');
 
+            StreamCorruptedException streamCorruptedCause = X.cause(e, StreamCorruptedException.class);
+
+            if (streamCorruptedCause != null) {
+                // Lets check StreamCorruptedException for SSL Alert message
+                // Sample SSL Alert message: 15:03:03:00:02:02:0a
+                // 15 = Alert
+                // 03:03 = SSL version
+                // 00:02 = payload length
+                // 02:0a = critical (02) / unexpected message (0a)
+                // So, check message for "invalid stream header: 150X0X00"
+
+                String msg = streamCorruptedCause.getMessage();
+
+                if (msg != null && sslMsgPattern.matcher(msg).matches())
+                    streamCorruptedCause.initCause(new SSLException("Detected SSL alert in StreamCorruptedException"));
+            }
             throw e;
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
new file mode 100644
index 0000000..768f5f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestThread;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Client-based discovery SPI test with non-Ignite servers.
+ */
+public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest {
+    /** Non-Ignite Server port. */
+    private final static int SERVER_PORT = 47500;
+
+    /** Non-Ignite Server socket. */
+    private ServerSocket srvSock;
+
+    /** Count of accepted connections to non-Ignite Server. */
+    private int connCnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+
+        ipFinder.setAddresses(Collections.singleton("127.0.0.1:" + Integer.toString(SERVER_PORT) + ".." +
+            Integer.toString(SERVER_PORT + 2)));
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpiWithOrderedIps().setIpFinder(ipFinder));
+
+        if (igniteInstanceName.startsWith("client"))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopTcpThread();
+
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Starts tcp test thread
+     * @param workerFactory one of WorkerFactory
+     */
+    private void startTcpThread(final WorkerFactory workerFactory) {
+        connCnt = 0;
+
+        try {
+            srvSock = new ServerSocket(SERVER_PORT, 10, InetAddress.getByName("127.0.0.1"));
+        }
+        catch (Exception e) {
+            fail("Unexpected TcpServer exception " + e.getMessage());
+        }
+
+        new GridTestThread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while(!Thread.currentThread().isInterrupted()) {
+                        Socket clientSock = srvSock.accept();
+
+                        connCnt++;
+
+                        // Create a new thread for socket connection.
+                        new GridTestThread(workerFactory.newWorker(clientSock)).start();
+                    }
+                }
+                catch (Exception e) {
+                    if (!srvSock.isClosed())
+                        e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    /**
+     * Stops tcp test thread
+     * @throws IOException IOException
+     */
+    private void stopTcpThread() throws IOException {
+        if (srvSock != null && !srvSock.isClosed())
+            srvSock.close();
+    }
+
+    /**
+     * Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
+     *
+     * @throws Exception in case of error.
+     */
+    public void testWrongHandshakeResponse() throws Exception {
+        startTcpThread(new SomeResponseWorker());
+
+        simpleTest();
+    }
+
+    /**
+     * Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
+     *
+     * @throws Exception in case of error.
+     */
+    public void testNoHandshakeResponse() throws Exception {
+        startTcpThread(new NoResponseWorker());
+
+        simpleTest();
+    }
+
+    /**
+     * Test that Client successfully ignores when server closes sockets after Discovery Handshake Request.
+     *
+     * @throws Exception in case of error.
+     */
+    public void testDisconnectOnRequest() throws Exception {
+        startTcpThread(new DisconnectOnRequestWorker());
+
+        simpleTest();
+    }
+
+    /**
+     * Test that Client successfully ignores when server closes sockets immediately.
+     *
+     * @throws Exception in case of error.
+     */
+    public void testEarlyDisconnect() throws Exception {
+        startTcpThread(new EarlyDisconnectWorker());
+
+        simpleTest();
+    }
+
+    /**
+     * Some simple sanity check with the Server and Client
+     * It is expected that both client and server could successfully perform Discovery Procedure when there is
+     * unknown (test) server in the ipFinder list.
+     */
+    private void simpleTest() {
+        try {
+            Ignite srv = startGrid("server");
+            Ignite client = startGrid("client");
+
+            awaitPartitionMapExchange();
+
+            assertEquals(2, srv.cluster().nodes().size());
+            assertEquals(2, client.cluster().nodes().size());
+            assertTrue(connCnt >= 2);
+
+            srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(1, 1);
+
+            assertEquals(1, client.getOrCreateCache(DEFAULT_CACHE_NAME).get(1));
+        }
+        catch (Exception e) {
+            fail("Failed with unexpected exception: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Just a factory for runnable workers
+     */
+    private interface WorkerFactory {
+        /**
+         * Creates a new worker for socket
+         * @param clientSock socket for worker
+         * @return runnable Worker
+         */
+        Runnable newWorker(Socket clientSock);
+    }
+
+    /**
+     * SocketWorker
+     */
+    private abstract class SocketWorker implements Runnable {
+        /** Client socket. */
+        Socket clientSock;
+
+        /**
+         * @param clientSock Client socket.
+         */
+        SocketWorker(Socket clientSock) {
+            this.clientSock = clientSock;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                InputStream input = clientSock.getInputStream();
+                OutputStream output = clientSock.getOutputStream();
+                byte[] buf = new byte[1024];
+
+                while (!clientSock.isClosed() && input.read(buf) > 0)
+                    action(input, output);
+
+                if (!clientSock.isClosed())
+                    clientSock.close();
+            }
+            catch (IOException e) {
+                log.error("Unexpected error", e);
+            }
+        }
+
+        /**
+         * @param input socket input stream
+         * @param output socket output stream
+         * @throws IOException IOException
+         */
+        public abstract void action(InputStream input, OutputStream output) throws IOException;
+    }
+
+    /**
+     * SomeResponseWorker.
+     */
+    private class SomeResponseWorker implements WorkerFactory {
+        /** {@inheritDoc} */
+        @Override public Runnable newWorker(Socket clientSock) {
+            return new SocketWorker(clientSock) {
+                @Override public void action(InputStream input, OutputStream output) throws IOException {
+                    output.write("Some response".getBytes());
+
+                    log.error("TEST: Some response was sent to " + clientSock.getRemoteSocketAddress());
+                }
+            };
+        }
+    }
+
+    /**
+     * NoResponseWorker.
+     */
+    private class NoResponseWorker implements WorkerFactory {
+        /** {@inheritDoc} */
+        @Override public Runnable newWorker(Socket clientSock) {
+            return new SocketWorker(clientSock) {
+                @Override public void action(InputStream input, OutputStream output) throws IOException {
+                    log.error("TEST: No response was sent to " + clientSock.getRemoteSocketAddress());
+               }
+            };
+        }
+    }
+
+    /**
+     * DisconnectOnRequestWorker.
+     */
+    private class DisconnectOnRequestWorker implements WorkerFactory {
+        /** {@inheritDoc} */
+        @Override public Runnable newWorker(Socket clientSock) {
+            return new SocketWorker(clientSock) {
+                @Override public void action(InputStream input, OutputStream output) throws IOException {
+                    clientSock.close();
+
+                    log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
+                }
+            };
+        }
+    }
+
+    /**
+     * EarlyDisconnectWorker.
+     */
+    private class EarlyDisconnectWorker implements WorkerFactory {
+        /** {@inheritDoc} */
+        @Override public Runnable newWorker(Socket clientSock) {
+            return new SocketWorker(clientSock) {
+                @Override public void action(InputStream input, OutputStream output) throws IOException {
+                    // No-op
+                }
+
+                @Override public void run() {
+                    try {
+                        clientSock.close();
+
+                        log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
+                    }
+                    catch (IOException e) {
+                        log.error("Unexpected error", e);
+                    }
+                }
+            };
+        }
+    }
+
+    /**
+     * TcpDiscoverySpi with non-shuffled resolved IP addresses. We should ensure that in this test non-Ignite server
+     * is the first element of the addresses list
+     */
+    class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+            Collection<InetSocketAddress> shuffled = super.resolvedAddresses();
+            List<InetSocketAddress> res = new ArrayList<>(shuffled);
+
+            Collections.sort(res, new Comparator<InetSocketAddress>() {
+                @Override public int compare(InetSocketAddress o1, InetSocketAddress o2) {
+                    return o1.toString().compareTo(o2.toString());
+                }
+            });
+
+            return res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/035dcb7f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ff4c9c1..626875c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -43,6 +43,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest;
@@ -95,6 +96,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
         suite.addTest(new TestSuite(AuthenticationRestartTest.class));
 
+        suite.addTest(new TestSuite(TcpDiscoveryWithWrongServerTest.class));
+
         // Client connect.
         suite.addTest(new TestSuite(IgniteClientConnectTest.class));
         suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));