You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/24 14:51:01 UTC

[20/30] incubator-ignite git commit: Fixed SSL bugs. Added test.

Fixed SSL bugs. Added test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e37efa33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e37efa33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e37efa33

Branch: refs/heads/master
Commit: e37efa3357d96e7831068eaec29627bd1bcc2ba0
Parents: c5dc492
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Jul 23 11:37:26 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Jul 23 11:38:35 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  5 +-
 .../util/nio/ssl/BlockingSslHandler.java        | 61 ++++++++++++--------
 .../communication/tcp/TcpCommunicationSpi.java  | 60 ++++++++++---------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  5 +-
 .../tcp/IgniteCacheSslStartStopSelfTest.java    | 46 +++++++++++++++
 .../IgniteCacheFailoverTestSuite.java           |  4 +-
 6 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a246dc..b746261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2064,9 +2064,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     private void ackSecurity() {
         assert log != null;
 
-        if (log.isInfoEnabled())
-            log.info("Security status [authentication=" + onOff(ctx.security().enabled())
-                + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
+        U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled())
+            + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
index eee90d8..9890efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
@@ -39,14 +39,14 @@ public class BlockingSslHandler {
     /** Logger. */
     private IgniteLogger log;
 
-    /** */
+    /** Socket channel. */
     private SocketChannel ch;
 
-    /** */
-    private GridFutureAdapter<ByteBuffer> fut;
+    /** Order. */
+    private final ByteOrder order;
 
     /** SSL engine. */
-    private SSLEngine sslEngine;
+    private final SSLEngine sslEngine;
 
     /** Handshake completion flag. */
     private boolean handshakeFinished;
@@ -69,33 +69,38 @@ public class BlockingSslHandler {
     /**
      * @param sslEngine SSLEngine.
      * @param ch Socket channel.
-     * @param fut Future.
+     * @param directBuf Direct buffer flag.
+     * @param order Byte order.
      * @param log Logger.
      */
-    public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer> fut,
-        IgniteLogger log) throws SSLException {
+    public BlockingSslHandler(SSLEngine sslEngine,
+        SocketChannel ch,
+        boolean directBuf,
+        ByteOrder order,
+        IgniteLogger log)
+        throws SSLException {
         this.ch = ch;
-        this.fut = fut;
         this.log = log;
-
         this.sslEngine = sslEngine;
+        this.order = order;
 
         // Allocate a little bit more so SSL engine would not return buffer overflow status.
         int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
 
-        outNetBuf = ByteBuffer.allocate(netBufSize);
-        inNetBuf = ByteBuffer.allocate(netBufSize);
+        outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+        outNetBuf.order(order);
 
         // Initially buffer is empty.
         outNetBuf.position(0);
         outNetBuf.limit(0);
 
+        inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+        inNetBuf.order(order);
+
         appBuf = allocateAppBuff();
 
         handshakeStatus = sslEngine.getHandshakeStatus();
 
-        sslEngine.setUseClientMode(true);
-
         if (log.isDebugEnabled())
             log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']');
     }
@@ -122,12 +127,6 @@ public class BlockingSslHandler {
                 case FINISHED: {
                     handshakeFinished = true;
 
-                    if (fut != null) {
-                        appBuf.flip();
-
-                        fut.onDone(appBuf);
-                    }
-
                     loop = false;
 
                     break;
@@ -187,6 +186,15 @@ public class BlockingSslHandler {
     }
 
     /**
+     * @return Application buffer with decoded data.
+     */
+    public ByteBuffer applicationBuffer() {
+        appBuf.flip();
+
+        return appBuf;
+    }
+
+    /**
      * Encrypts data to be written to the network.
      *
      * @param src data to encrypt.
@@ -439,27 +447,32 @@ public class BlockingSslHandler {
 
         int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);
 
-        return ByteBuffer.allocate(appBufSize);
+        ByteBuffer buf = ByteBuffer.allocate(appBufSize);
+        buf.order(order);
+
+        return buf;
     }
 
     /**
      * Read data from net buffer.
      */
-    private void readFromNet() {
+    private void readFromNet() throws IgniteCheckedException {
         try {
             inNetBuf.clear();
 
-            ch.read(inNetBuf);
+            int read = ch.read(inNetBuf);
+
+            if (read == -1)
+                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
         }
         catch (IOException e) {
-            e.printStackTrace();
+            throw new IgniteCheckedException("Failed to write byte to socket.", e);
         }
     }
 
     /**
      * Copies data from out net buffer and passes it to the underlying chain.
      *
-     * @return Nothing.
      * @throws GridNioException If send failed.
      */
     private void writeNetBuffer() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 99ca2b7..48dc52e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2051,12 +2051,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     long rcvCnt = -1;
 
-                    GridTuple<SSLEngine> ssl = new GridTuple<>();
+                    SSLEngine sslEngine = null;
 
                     try {
                         ch.socket().connect(addr, (int)connTimeout);
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl);
+                        if (isSslEnabled()) {
+                            sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+
+                            sslEngine.setUseClientMode(true);
+                        }
+
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2072,10 +2078,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         meta.put(NODE_ID_META, node.id());
 
                         if (isSslEnabled()) {
-                            assert ssl != null;
-                            assert ssl.get() != null;
+                            assert sslEngine != null;
 
-                            meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get());
+                            meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
                         }
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
@@ -2211,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
-        @Nullable GridTuple<SSLEngine> ssl
+        @Nullable SSLEngine ssl
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
@@ -2233,23 +2238,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     ByteBuffer buf;
 
                     if (isSslEnabled()) {
-                        GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>();
-
-                        SSLEngine sslEngine = ignite.configuration().getSslContextFactory()
-                            .create().createSSLEngine();
-
-                        sslEngine.setUseClientMode(true);
-
-                        sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log);
+                        sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log);
 
                         if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake isn't completed.");
-
-                        ssl.set(sslEngine);
+                            throw new IgniteCheckedException("SSL handshake is not completed.");
 
-                        ByteBuffer handBuff = handFut.get();
+                        ByteBuffer handBuff = sslHnd.applicationBuffer();
 
-                        if (handBuff.limit() < 17) {
+                        if (handBuff.remaining() < 17) {
                             buf = ByteBuffer.allocate(1000);
 
                             int read = ch.read(buf);
@@ -2338,18 +2334,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             buf = ByteBuffer.allocate(1000);
 
+                            ByteBuffer decode = null;
+
                             buf.order(ByteOrder.nativeOrder());
 
-                            int read = ch.read(buf);
+                            for (int i = 0; i < 9; ) {
+                                int read = ch.read(buf);
 
-                            if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
-                                    "(connection closed).");
+                                if (read == -1)
+                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
+                                        "(connection closed).");
 
-                            buf.flip();
+                                buf.flip();
+
+                                decode = sslHnd.decode(buf);
 
-                            rcvCnt = sslHnd.decode(buf).getLong(1);
-                        } else {
+                                i += decode.remaining();
+
+                                buf.flip();
+                                buf.compact();
+                            }
+
+                            rcvCnt = decode.getLong(1);
+                        }
+                        else {
                             buf = ByteBuffer.allocate(9);
 
                             buf.order(ByteOrder.nativeOrder());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 34f90f7..68552a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4242,10 +4242,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
-                    if (X.hasCause(e, SSLException.class) && spi.isSslEnabled())
+                    if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
                         LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " +
                             "Missed SSL configuration on node? [sock=" + sock + ']');
-                    else if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) {
+                    else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
+                        && !spi.isNodeStopping0()) {
                         if (U.isMacInvalidArgumentError(e))
                             LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
                                 U.MAC_INVALID_ARG_MSG);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
new file mode 100644
index 0000000..9bf6caa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.testframework.*;
+
+/**
+ *
+ */
+public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setSslContextFactory(GridTestUtils.sslFactory());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int keysCount() {
+        return 60_000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 80bfbf2..524bfb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.testframework.*;
 
 import java.util.*;
@@ -75,6 +75,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
         suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheSslStartStopSelfTest.class);
+
         return suite;
     }
 }