You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/27 22:10:45 UTC
[20/50] [abbrv] incubator-ignite git commit: Fixed SSL bugs. Added
test.
Fixed SSL bugs. Added test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e37efa33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e37efa33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e37efa33
Branch: refs/heads/ignite-104
Commit: e37efa3357d96e7831068eaec29627bd1bcc2ba0
Parents: c5dc492
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Jul 23 11:37:26 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Jul 23 11:38:35 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 5 +-
.../util/nio/ssl/BlockingSslHandler.java | 61 ++++++++++++--------
.../communication/tcp/TcpCommunicationSpi.java | 60 ++++++++++---------
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +-
.../tcp/IgniteCacheSslStartStopSelfTest.java | 46 +++++++++++++++
.../IgniteCacheFailoverTestSuite.java | 4 +-
6 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a246dc..b746261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2064,9 +2064,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
private void ackSecurity() {
assert log != null;
- if (log.isInfoEnabled())
- log.info("Security status [authentication=" + onOff(ctx.security().enabled())
- + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
+ U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled())
+ + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
index eee90d8..9890efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
@@ -39,14 +39,14 @@ public class BlockingSslHandler {
/** Logger. */
private IgniteLogger log;
- /** */
+ /** Socket channel. */
private SocketChannel ch;
- /** */
- private GridFutureAdapter<ByteBuffer> fut;
+ /** Order. */
+ private final ByteOrder order;
/** SSL engine. */
- private SSLEngine sslEngine;
+ private final SSLEngine sslEngine;
/** Handshake completion flag. */
private boolean handshakeFinished;
@@ -69,33 +69,38 @@ public class BlockingSslHandler {
/**
* @param sslEngine SSLEngine.
* @param ch Socket channel.
- * @param fut Future.
+ * @param directBuf Direct buffer flag.
+ * @param order Byte order.
* @param log Logger.
*/
- public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer> fut,
- IgniteLogger log) throws SSLException {
+ public BlockingSslHandler(SSLEngine sslEngine,
+ SocketChannel ch,
+ boolean directBuf,
+ ByteOrder order,
+ IgniteLogger log)
+ throws SSLException {
this.ch = ch;
- this.fut = fut;
this.log = log;
-
this.sslEngine = sslEngine;
+ this.order = order;
// Allocate a little bit more so SSL engine would not return buffer overflow status.
int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
- outNetBuf = ByteBuffer.allocate(netBufSize);
- inNetBuf = ByteBuffer.allocate(netBufSize);
+ outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+ outNetBuf.order(order);
// Initially buffer is empty.
outNetBuf.position(0);
outNetBuf.limit(0);
+ inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
+ inNetBuf.order(order);
+
appBuf = allocateAppBuff();
handshakeStatus = sslEngine.getHandshakeStatus();
- sslEngine.setUseClientMode(true);
-
if (log.isDebugEnabled())
log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']');
}
@@ -122,12 +127,6 @@ public class BlockingSslHandler {
case FINISHED: {
handshakeFinished = true;
- if (fut != null) {
- appBuf.flip();
-
- fut.onDone(appBuf);
- }
-
loop = false;
break;
@@ -187,6 +186,15 @@ public class BlockingSslHandler {
}
/**
+ * @return Application buffer with decoded data.
+ */
+ public ByteBuffer applicationBuffer() {
+ appBuf.flip();
+
+ return appBuf;
+ }
+
+ /**
* Encrypts data to be written to the network.
*
* @param src data to encrypt.
@@ -439,27 +447,32 @@ public class BlockingSslHandler {
int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);
- return ByteBuffer.allocate(appBufSize);
+ ByteBuffer buf = ByteBuffer.allocate(appBufSize);
+ buf.order(order);
+
+ return buf;
}
/**
* Read data from net buffer.
*/
- private void readFromNet() {
+ private void readFromNet() throws IgniteCheckedException {
try {
inNetBuf.clear();
- ch.read(inNetBuf);
+ int read = ch.read(inNetBuf);
+
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
}
catch (IOException e) {
- e.printStackTrace();
+ throw new IgniteCheckedException("Failed to write byte to socket.", e);
}
}
/**
* Copies data from out net buffer and passes it to the underlying chain.
*
- * @return Nothing.
* @throws GridNioException If send failed.
*/
private void writeNetBuffer() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 99ca2b7..48dc52e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2051,12 +2051,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
- GridTuple<SSLEngine> ssl = new GridTuple<>();
+ SSLEngine sslEngine = null;
try {
ch.socket().connect(addr, (int)connTimeout);
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl);
+ if (isSslEnabled()) {
+ sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+
+ sslEngine.setUseClientMode(true);
+ }
+
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine);
if (rcvCnt == -1)
return null;
@@ -2072,10 +2078,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
meta.put(NODE_ID_META, node.id());
if (isSslEnabled()) {
- assert ssl != null;
- assert ssl.get() != null;
+ assert sslEngine != null;
- meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get());
+ meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
}
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
@@ -2211,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
- @Nullable GridTuple<SSLEngine> ssl
+ @Nullable SSLEngine ssl
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
@@ -2233,23 +2238,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ByteBuffer buf;
if (isSslEnabled()) {
- GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>();
-
- SSLEngine sslEngine = ignite.configuration().getSslContextFactory()
- .create().createSSLEngine();
-
- sslEngine.setUseClientMode(true);
-
- sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log);
+ sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log);
if (!sslHnd.handshake())
- throw new IgniteCheckedException("SSL handshake isn't completed.");
-
- ssl.set(sslEngine);
+ throw new IgniteCheckedException("SSL handshake is not completed.");
- ByteBuffer handBuff = handFut.get();
+ ByteBuffer handBuff = sslHnd.applicationBuffer();
- if (handBuff.limit() < 17) {
+ if (handBuff.remaining() < 17) {
buf = ByteBuffer.allocate(1000);
int read = ch.read(buf);
@@ -2338,18 +2334,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
buf = ByteBuffer.allocate(1000);
+ ByteBuffer decode = null;
+
buf.order(ByteOrder.nativeOrder());
- int read = ch.read(buf);
+ for (int i = 0; i < 9; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- buf.flip();
+ buf.flip();
+
+ decode = sslHnd.decode(buf);
- rcvCnt = sslHnd.decode(buf).getLong(1);
- } else {
+ i += decode.remaining();
+
+ buf.flip();
+ buf.compact();
+ }
+
+ rcvCnt = decode.getLong(1);
+ }
+ else {
buf = ByteBuffer.allocate(9);
buf.order(ByteOrder.nativeOrder());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 34f90f7..68552a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4242,10 +4242,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
- if (X.hasCause(e, SSLException.class) && spi.isSslEnabled())
+ if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " +
"Missed SSL configuration on node? [sock=" + sock + ']');
- else if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) {
+ else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
+ && !spi.isNodeStopping0()) {
if (U.isMacInvalidArgumentError(e))
LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
U.MAC_INVALID_ARG_MSG);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
new file mode 100644
index 0000000..9bf6caa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.testframework.*;
+
+/**
+ *
+ */
+public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setSslContextFactory(GridTestUtils.sslFactory());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int keysCount() {
+ return 60_000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 80bfbf2..524bfb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.testframework.*;
import java.util.*;
@@ -75,6 +75,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);
+ suite.addTestSuite(IgniteCacheSslStartStopSelfTest.class);
+
return suite;
}
}