You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/30 11:44:04 UTC

[ignite] branch master updated: IGNITE-15921 Thin client: drop connection on invalid handshake without allocating buffer (#9610)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 85bb788  IGNITE-15921 Thin client: drop connection on invalid handshake without allocating buffer (#9610)
85bb788 is described below

commit 85bb788bc6ab21668124fe07bee3d984752bc2c5
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue Nov 30 14:43:34 2021 +0300

    IGNITE-15921 Thin client: drop connection on invalid handshake without allocating buffer (#9610)
    
    * Check first 3 bytes of the handshake message without allocating the buffer, drop connection in case of garbage data.
    * Add 64 MiB handshake size limit.
---
 .../odbc/ClientListenerNioMessageParser.java       | 16 ++---
 .../internal/processors/odbc/ClientMessage.java    | 70 +++++++++++++++++++++-
 .../org/apache/ignite/client/ConnectionTest.java   | 49 +++++++++++++++
 .../ClientProtocolCompatibilityTest.cs             |  6 +-
 .../Client/RawSocketTest.cs                        | 34 ++++++++++-
 5 files changed, 160 insertions(+), 15 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
index 9c01770..5210171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.util.nio.GridNioParser;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * This class implements stream parser.
@@ -42,8 +41,8 @@ public class ClientListenerNioMessageParser implements GridNioParser {
     /** Message metadata key. */
     static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
-    /** Reader metadata key. */
-    static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+    /** First message key. */
+    static final int FIRST_MESSAGE_RECEIVED_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
     private final IgniteLogger log;
@@ -55,19 +54,22 @@ public class ClientListenerNioMessageParser implements GridNioParser {
 
     /** {@inheritDoc} */
     @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
-        Message msg = ses.removeMeta(MSG_META_KEY);
+        ClientMessage msg = ses.removeMeta(MSG_META_KEY);
 
         try {
             if (msg == null)
-                msg = new ClientMessage();
+                msg = new ClientMessage(ses.meta(FIRST_MESSAGE_RECEIVED_KEY) == null);
 
             boolean finished = false;
 
             if (buf.hasRemaining())
-                finished = msg.readFrom(buf, null);
+                finished = msg.readFrom(buf);
+
+            if (finished) {
+                ses.addMeta(FIRST_MESSAGE_RECEIVED_KEY, true);
 
-            if (finished)
                 return msg;
+            }
             else {
                 ses.addMeta(MSG_META_KEY, msg);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
index 9c09939..c6e7f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
@@ -35,6 +35,18 @@ public class ClientMessage implements Message, Externalizable {
     /** */
     private static final long serialVersionUID = -4609408156037304495L;
 
+    /** Limit handshake size to 64 MiB. */
+    private static final int MAX_HANDSHAKE_SIZE = 64 * 1024 * 1024;
+
+    /** First 3 bytes in handshake are either 1 1 0 (handshake = 1, major version = 1)... */
+    private static final int HANDSHAKE_HEADER = 1 + (1 << 8);
+
+    /** ...or 1 2 0 (handshake = 1, major version = 2). */
+    private static final int HANDSHAKE_HEADER2 = 1 + (2 << 8);
+
+    /** */
+    private final boolean isFirstMessage;
+
     /** */
     private byte[] data;
 
@@ -48,16 +60,29 @@ public class ClientMessage implements Message, Externalizable {
     private int msgSize;
 
     /** */
-    public ClientMessage() {}
+    private int firstMessageHeader;
+
+    /** */
+    public ClientMessage() {
+        isFirstMessage = false;
+    }
+
+    /** */
+    public ClientMessage(boolean isFirstMessage) {
+        this.isFirstMessage = isFirstMessage;
+    }
 
     /** */
     public ClientMessage(byte[] data) {
+        //noinspection AssignmentOrReturnOfFieldWithMutableType
         this.data = data;
+        isFirstMessage = false;
     }
 
     /** */
     public ClientMessage(BinaryHeapOutputStream stream) {
         this.stream = stream;
+        isFirstMessage = false;
     }
 
     /** {@inheritDoc} */
@@ -109,6 +134,16 @@ public class ClientMessage implements Message, Externalizable {
 
     /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Reads this message from provided byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Whether message was fully read.
+     */
+    public boolean readFrom(ByteBuffer buf) throws IOException {
         if (cnt < 0) {
             for (; cnt < 0 && buf.hasRemaining(); cnt++)
                 msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
@@ -116,10 +151,13 @@ public class ClientMessage implements Message, Externalizable {
             if (cnt < 0)
                 return false;
 
-            data = new byte[msgSize];
+            if (msgSize <= 0)
+                throw new IOException("Message size must be greater than 0: " + msgSize);
+
+            if (isFirstMessage && msgSize > MAX_HANDSHAKE_SIZE)
+                throw new IOException("Client handshake size limit exceeded: " + msgSize + " > " + MAX_HANDSHAKE_SIZE);
         }
 
-        assert data != null;
         assert cnt >= 0;
         assert msgSize > 0;
 
@@ -131,6 +169,31 @@ public class ClientMessage implements Message, Externalizable {
             if (missing > 0) {
                 int len = Math.min(missing, remaining);
 
+                if (isFirstMessage) {
+                    // Sanity check: first 3 bytes in handshake are always 1 1 0 (handshake = 1, major version = 1).
+                    // Do not allocate the buffer before validating the header to protect us from garbage data sent by unrelated application
+                    // connecting on our port by accident.
+                    while (len > 0 && cnt < 3) {
+                        firstMessageHeader |= (buf.get() & 0xFF) << (8 * cnt);
+                        cnt++;
+                        len--;
+                    }
+
+                    if (cnt < 3)
+                        return false;
+
+                    if (firstMessageHeader != HANDSHAKE_HEADER && firstMessageHeader != HANDSHAKE_HEADER2)
+                        throw new IOException("Handshake header check failed: " + firstMessageHeader);
+
+                    // Header is valid, create buffer and set first bytes.
+                    data = new byte[msgSize];
+                    data[0] = 1;
+                    data[1] = (byte)(firstMessageHeader >> 8);
+                }
+
+                if (data == null)
+                    data = new byte[msgSize];
+
                 buf.get(data, cnt, len);
 
                 cnt += len;
@@ -172,6 +235,7 @@ public class ClientMessage implements Message, Externalizable {
             stream = null;
         }
 
+        //noinspection AssignmentOrReturnOfFieldWithMutableType
         return data;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/client/ConnectionTest.java
index b394f3c..1f193d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.client;
 
+import java.io.OutputStream;
+import java.net.Socket;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Checks if it can connect to a valid address from the node address list.
  */
@@ -75,6 +79,51 @@ public class ConnectionTest {
         testConnection(IPv6_HOST, "[::1]:10800");
     }
 
+    /** */
+    @Test
+    public void testHandshakeTooLargeServerDropsConnection() throws Exception {
+        try (LocalIgniteCluster ignored = LocalIgniteCluster.start(1, IPv4_HOST)) {
+            Socket clientSocket = new Socket(IPv4_HOST, 10800);
+            OutputStream stream = clientSocket.getOutputStream();
+
+            stream.write(new byte[]{1, 1, 1, 1});
+            stream.flush();
+
+            // Read returns -1 when end of stream has been reached, blocks otherwise.
+            assertEquals(-1, clientSocket.getInputStream().read());
+        }
+    }
+
+    /** */
+    @Test
+    public void testNegativeMessageSizeDropsConnection() throws Exception {
+        try (LocalIgniteCluster ignored = LocalIgniteCluster.start(1, IPv4_HOST)) {
+            Socket clientSocket = new Socket(IPv4_HOST, 10800);
+            OutputStream stream = clientSocket.getOutputStream();
+
+            byte b = (byte)255;
+            stream.write(new byte[]{b, b, b, b});
+            stream.flush();
+
+            // Read returns -1 when end of stream has been reached, blocks otherwise.
+            assertEquals(-1, clientSocket.getInputStream().read());
+        }
+    }
+
+    /** */
+    @Test
+    public void testInvalidHandshakeHeaderDropsConnection() throws Exception {
+        try (LocalIgniteCluster ignored = LocalIgniteCluster.start(1, IPv4_HOST)) {
+            Socket clientSocket = new Socket(IPv4_HOST, 10800);
+            OutputStream stream = clientSocket.getOutputStream();
+
+            stream.write(new byte[]{10, 0, 0, 0, 42, 42, 42});
+            stream.flush();
+
+            assertEquals(-1, clientSocket.getInputStream().read());
+        }
+    }
+
     /**
      * @param addrs Addresses to connect.
      * @param host LocalIgniteCluster host.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compatibility/ClientProtocolCompatibilityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compatibility/ClientProtocolCompatibilityTest.cs
index c04a01f..6fa058c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compatibility/ClientProtocolCompatibilityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compatibility/ClientProtocolCompatibilityTest.cs
@@ -124,7 +124,7 @@ namespace Apache.Ignite.Core.Tests.Client.Compatibility
         public void TestClientNewerThanServerReconnectsOnServerVersion()
         {
             // Use a non-existent version that is not supported by the server
-            var version = new ClientProtocolVersion(short.MaxValue, short.MaxValue, short.MaxValue);
+            var version = new ClientProtocolVersion(1, short.MaxValue, short.MaxValue);
 
             using (var client = GetClient(version))
             {
@@ -133,8 +133,8 @@ namespace Apache.Ignite.Core.Tests.Client.Compatibility
                 var logs = GetLogs(client);
 
                 var expectedMessage = "Handshake failed on 127.0.0.1:10800, " +
-                                      "requested protocol version = 32767.32767.32767, server protocol version = , " +
-                                      "status = Fail, message = Unsupported version: 32767.32767.32767";
+                                      "requested protocol version = 1.32767.32767, server protocol version = , " +
+                                      "status = Fail, message = Unsupported version: 1.32767.32767";
 
                 var message = Regex.Replace(
                     logs[2].Message, @"server protocol version = \d\.\d\.\d", "server protocol version = ");
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
index 8ab110f..057e945 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Core.Tests.Client
 {
     using System;
+    using System.Collections.Generic;
     using System.Net;
     using System.Net.Sockets;
     using Apache.Ignite.Core.Client;
@@ -32,6 +33,17 @@ namespace Apache.Ignite.Core.Tests.Client
     /// </summary>
     public class RawSocketTest : ClientTestBase
     {
+        /** */
+        private readonly List<Socket> _createdSockets = new List<Socket>();
+
+        /** */
+        [TearDown]
+        public void TearDown()
+        {
+            _createdSockets.ForEach(x => x.Dispose());
+            _createdSockets.Clear();
+        }
+
         /// <summary>
         /// Tests the socket handshake connection.
         /// </summary>
@@ -131,10 +143,25 @@ namespace Apache.Ignite.Core.Tests.Client
         }
 
         /// <summary>
+        /// Tests that invalid handshake data causes the connection to be closed by the server.
+        /// </summary>
+        [Test]
+        public void TestInvalidHandshakeClosesConnection()
+        {
+            var sock = GetSocket(ClientConnectorConfiguration.DefaultPort);
+            Assert.IsTrue(sock.Connected);
+
+            sock.Send(new byte[] { 1, 1, 1, 1, 1 });
+
+            // Receive returns 0 when connection has been closed, blocks otherwise.
+            Assert.AreEqual(0, sock.Receive(new byte[4]));
+        }
+
+        /// <summary>
         /// Gets the socket.
         /// </summary>
         /// <returns>Connected socket after handshake.</returns>
-        private static Socket GetSocket()
+        private Socket GetSocket()
         {
             var sock = GetSocket(ClientConnectorConfiguration.DefaultPort);
             Assert.IsTrue(sock.Connected);
@@ -210,11 +237,14 @@ namespace Apache.Ignite.Core.Tests.Client
         /// <summary>
         /// Gets the socket.
         /// </summary>
-        private static Socket GetSocket(int port)
+        private Socket GetSocket(int port)
         {
             var endPoint = new IPEndPoint(IPAddress.Loopback, port);
             var sock = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
             sock.Connect(endPoint);
+
+            _createdSockets.Add(sock);
+
             return sock;
         }
     }