You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/20 14:26:21 UTC
[1/2] incubator-ignite git commit: IGNITE-323 Added ssl for
TcpCommunication.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-323 [created] c84784525
IGNITE-323 Added ssl for TcpCommunication.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/730b1046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/730b1046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/730b1046
Branch: refs/heads/ignite-323
Commit: 730b1046e32651d953587fb1f5851c299baa4902
Parents: 13e55b2
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jul 20 13:56:47 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Jul 20 15:20:09 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 27 ++
.../ignite/internal/util/lang/GridFunc.java | 32 ++
.../util/nio/GridNioSessionMetaKey.java | 5 +-
.../util/nio/ssl/BlockingSslHandler.java | 473 +++++++++++++++++++
.../internal/util/nio/ssl/GridNioSslFilter.java | 2 +
.../util/nio/ssl/GridNioSslHandler.java | 12 +-
.../communication/tcp/TcpCommunicationSpi.java | 166 ++++++-
.../GridAbstractCommunicationSelfTest.java | 16 +
.../tcp/GridTcpCommunicationSpiSslSelfTest.java | 38 ++
.../ignite/testframework/junits/IgniteMock.java | 13 +
10 files changed, 761 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..73db5fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.events.*;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.client.ssl.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -400,6 +401,9 @@ public class IgniteConfiguration {
/** Cache store session listeners. */
private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
+ /** SSL connection factory. */
+ private GridSslContextFactory sslCtxFactory;
+
/**
* Creates valid grid configuration with all default values.
*/
@@ -480,6 +484,7 @@ public class IgniteConfiguration {
segResolvers = cfg.getSegmentationResolvers();
sndRetryCnt = cfg.getNetworkSendRetryCount();
sndRetryDelay = cfg.getNetworkSendRetryDelay();
+ sslCtxFactory = cfg.getSslContextFactory();
storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
svcCfgs = cfg.getServiceConfiguration();
sysPoolSize = cfg.getSystemThreadPoolSize();
@@ -1280,6 +1285,28 @@ public class IgniteConfiguration {
}
/**
+ * Sets SSL context factory that will be used for creating a secure socket layer.
+ *
+ * @param sslCtxFactory Ssl context factory.
+ * @see GridSslContextFactory
+ */
+ public IgniteConfiguration setSslContextFactory(GridSslContextFactory sslCtxFactory) {
+ this.sslCtxFactory = sslCtxFactory;
+
+ return this;
+ }
+
+ /**
+ * Returns SSL context factory that will be used for creating a secure socket layer.
+ *
+ * @return SSL connection factory.
+ * @see GridSslContextFactory
+ */
+ public GridSslContextFactory getSslContextFactory() {
+ return sslCtxFactory;
+ }
+
+ /**
* Returns a collection of life-cycle beans. These beans will be automatically
* notified of grid life-cycle events. Use life-cycle beans whenever you
* want to perform certain logic before and after grid startup and stopping
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..a202e9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3965,6 +3965,38 @@ public class GridFunc {
}
/**
+ * Creates map with given values.
+ *
+ * @param k1 Key 1.
+ * @param v1 Value 1.
+ * @param k2 Key 2.
+ * @param v2 Value 2.
+ * @param k3 Key 3.
+ * @param v3 Value 3.
+ * @param k4 Key 4.
+ * @param v4 Value 4.
+ * @param k5 Key 5.
+ * @param v5 Value 5.
+ * @param k6 Key 6.
+ * @param v6 Value 6.
+ * @param <K> Key's type.
+ * @param <V> Value's type.
+ * @return Created map.
+ */
+ public static <K, V> Map<K, V> asMap(K k1, V v1, K k2, V v2, K k3, V v3, K k4, V v4, K k5, V v5, K k6, V v6) {
+ Map<K, V> map = new GridLeanMap<>(5);
+
+ map.put(k1, v1);
+ map.put(k2, v2);
+ map.put(k3, v3);
+ map.put(k4, v4);
+ map.put(k5, v5);
+ map.put(k6, v6);
+
+ return map;
+ }
+
+ /**
* Convenience method to convert multiple elements into array.
*
* @param t Elements to convert into array.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index d7eb2f3..a4435f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -42,7 +42,10 @@ public enum GridNioSessionMetaKey {
MARSHALLER_ID,
/** Message writer. */
- MSG_WRITER;
+ MSG_WRITER,
+
+ /** Ssl engine. */
+ SSL_ENGINE;
/** Maximum count of NIO session keys in system. */
public static final int MAX_KEYS_CNT = 64;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
new file mode 100644
index 0000000..fd4dc43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio.ssl;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.net.ssl.*;
+import javax.net.ssl.SSLEngineResult.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
+import static javax.net.ssl.SSLEngineResult.Status.*;
+import static org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.*;
+
+/**
+ *
+ */
+public class BlockingSslHandler {
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** */
+ private SocketChannel ch;
+
+ /** */
+ private GridFutureAdapter<ByteBuffer> fut;
+
+ /** SSL engine. */
+ private SSLEngine sslEngine;
+
+ /** Handshake completion flag. */
+ private boolean handshakeFinished;
+
+ /** Engine handshake status. */
+ private HandshakeStatus handshakeStatus;
+
+ /** Output buffer into which encrypted data will be written. */
+ private ByteBuffer outNetBuf;
+
+ /** Input buffer from which SSL engine will decrypt data. */
+ private ByteBuffer inNetBuf;
+
+ /** Empty buffer used in handshake procedure. */
+ private ByteBuffer handshakeBuf = ByteBuffer.allocate(0);
+
+ /** Application buffer. */
+ private ByteBuffer appBuf;
+
+ /**
+ * @param sslEngine SSLEngine.
+ * @param ch Socket channel.
+ * @param fut Future.
+ * @param log Logger.
+ */
+ public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer> fut,
+ IgniteLogger log) throws SSLException {
+ this.ch = ch;
+ this.fut = fut;
+ this.log = log;
+
+ this.sslEngine = sslEngine;
+
+ // Allocate a little bit more so SSL engine would not return buffer overflow status.
+ int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
+
+ outNetBuf = ByteBuffer.allocate(netBufSize);
+ inNetBuf = ByteBuffer.allocate(netBufSize);
+
+ // Initially buffer is empty.
+ outNetBuf.position(0);
+ outNetBuf.limit(0);
+
+ appBuf = allocateAppBuff();
+
+ handshakeStatus = sslEngine.getHandshakeStatus();
+
+ sslEngine.setUseClientMode(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']');
+ }
+
+ /**
+ * Performs handshake procedure with remote peer.
+ *
+ * @throws GridNioException If filter processing has thrown an exception.
+ * @throws SSLException If failed to process SSL data.
+ */
+ public boolean handshake() throws IgniteCheckedException, SSLException {
+ if (log.isDebugEnabled())
+ log.debug("Entered handshake(): [handshakeStatus=" + handshakeStatus + ']');
+
+ sslEngine.beginHandshake();
+
+ handshakeStatus = sslEngine.getHandshakeStatus();
+
+ boolean loop = true;
+
+ while (loop) {
+ switch (handshakeStatus) {
+ case NOT_HANDSHAKING:
+ case FINISHED: {
+ handshakeFinished = true;
+
+ if (fut != null) {
+ appBuf.flip();
+
+ fut.onDone(appBuf);
+ }
+
+ loop = false;
+
+ break;
+ }
+
+ case NEED_TASK: {
+ handshakeStatus = runTasks();
+
+ break;
+ }
+
+ case NEED_UNWRAP: {
+ Status status = unwrapHandshake();
+
+ handshakeStatus = sslEngine.getHandshakeStatus();
+
+ if (status == BUFFER_UNDERFLOW && sslEngine.isInboundDone())
+ // Either there is no enough data in buffer or session was closed.
+ loop = false;
+
+ break;
+ }
+
+ case NEED_WRAP: {
+ // If the output buffer has remaining data, clear it.
+ if (outNetBuf.hasRemaining())
+ U.warn(log, "Output net buffer has unsent bytes during handshake (will clear). ");
+
+ outNetBuf.clear();
+
+ SSLEngineResult res = sslEngine.wrap(handshakeBuf, outNetBuf);
+
+ outNetBuf.flip();
+
+ handshakeStatus = res.getHandshakeStatus();
+
+ if (log.isDebugEnabled())
+ log.debug("Wrapped handshake data [status=" + res.getStatus() + ", handshakeStatus=" +
+ handshakeStatus + ']');
+
+ writeNetBuffer();
+
+ break;
+ }
+
+ default: {
+ throw new IllegalStateException("Invalid handshake status in handshake method [handshakeStatus=" +
+ handshakeStatus + ']');
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Leaved handshake(): [handshakeStatus=" + handshakeStatus + ']');
+
+ return handshakeFinished;
+ }
+
+ /**
+ * Encrypts data to be written to the network.
+ *
+ * @param src data to encrypt.
+ * @throws SSLException on errors.
+ * @return Output buffer with encrypted data.
+ */
+ public ByteBuffer encrypt(ByteBuffer src) throws SSLException {
+ assert handshakeFinished;
+
+ // The data buffer is (must be) empty, we can reuse the entire
+ // buffer.
+ outNetBuf.clear();
+
+ // Loop until there is no more data in src
+ while (src.hasRemaining()) {
+ int outNetRemaining = outNetBuf.capacity() - outNetBuf.position();
+
+ if (outNetRemaining < src.remaining() * 2) {
+ outNetBuf = expandBuffer(outNetBuf, Math.max(
+ outNetBuf.position() + src.remaining() * 2, outNetBuf.capacity() * 2));
+
+ if (log.isDebugEnabled())
+ log.debug("Expanded output net buffer: " + outNetBuf.capacity());
+ }
+
+ SSLEngineResult res = sslEngine.wrap(src, outNetBuf);
+
+ if (log.isDebugEnabled())
+ log.debug("Encrypted data [status=" + res.getStatus() + ", handshakeStaus=" +
+ res.getHandshakeStatus() + ']');
+
+ if (res.getStatus() == OK) {
+ if (res.getHandshakeStatus() == NEED_TASK)
+ runTasks();
+ }
+ else
+ throw new SSLException("Failed to encrypt data (SSL engine error) [status=" + res.getStatus() +
+ ", handshakeStatus=" + res.getHandshakeStatus() + ']');
+ }
+
+ outNetBuf.flip();
+
+ return outNetBuf;
+ }
+
+ /**
+ * Called by SSL filter when new message was received.
+ *
+ * @param buf Received message.
+ * @throws GridNioException If exception occurred while forwarding events to underlying filter.
+ * @throws SSLException If failed to process SSL data.
+ */
+ public ByteBuffer decode(ByteBuffer buf) throws IgniteCheckedException, SSLException {
+ inNetBuf.clear();
+
+ if (buf.limit() > inNetBuf.remaining()) {
+ inNetBuf = expandBuffer(inNetBuf, inNetBuf.capacity() + buf.limit() * 2);
+
+ appBuf = expandBuffer(appBuf, inNetBuf.capacity() * 2);
+
+ if (log.isDebugEnabled())
+ log.debug("Expanded buffers [inNetBufCapacity=" + inNetBuf.capacity() + ", appBufCapacity=" +
+ appBuf.capacity() + ']');
+ }
+
+ // append buf to inNetBuffer
+ inNetBuf.put(buf);
+
+ if (!handshakeFinished)
+ handshake();
+ else
+ unwrapData();
+
+ if (isInboundDone()) {
+ int newPosition = buf.position() - inNetBuf.position();
+
+ if (newPosition >= 0) {
+ buf.position(newPosition);
+
+ // If we received close_notify but not all bytes has been read by SSL engine, print a warning.
+ if (buf.hasRemaining())
+ U.warn(log, "Got unread bytes after receiving close_notify message (will ignore).");
+ }
+
+ inNetBuf.clear();
+ }
+
+ appBuf.flip();
+
+ return appBuf;
+ }
+
+ /**
+ * @return {@code True} if inbound data stream has ended, i.e. SSL engine received
+ * <tt>close_notify</tt> message.
+ */
+ boolean isInboundDone() {
+ return sslEngine.isInboundDone();
+ }
+
+ /**
+ * Unwraps user data to the application buffer.
+ *
+ * @throws SSLException If failed to process SSL data.
+ * @throws GridNioException If failed to pass events to the next filter.
+ */
+ private void unwrapData() throws IgniteCheckedException, SSLException {
+ if (log.isDebugEnabled())
+ log.debug("Unwrapping received data.");
+
+ // Flip buffer so we can read it.
+ inNetBuf.flip();
+
+ SSLEngineResult res = unwrap0();
+
+ // prepare to be written again
+ inNetBuf.compact();
+
+ checkStatus(res);
+
+ renegotiateIfNeeded(res);
+ }
+
+ /**
+ * Runs all tasks needed to continue SSL work.
+ *
+ * @return Handshake status after running all tasks.
+ */
+ private HandshakeStatus runTasks() {
+ Runnable runnable;
+
+ while ((runnable = sslEngine.getDelegatedTask()) != null) {
+ if (log.isDebugEnabled())
+ log.debug("Running SSL engine task: " + runnable + '.');
+
+ runnable.run();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Finished running SSL engine tasks. HandshakeStatus: " + sslEngine.getHandshakeStatus());
+
+ return sslEngine.getHandshakeStatus();
+ }
+
+
+ /**
+ * Unwraps handshake data and processes it.
+ *
+ * @return Status.
+ * @throws SSLException If SSL exception occurred while unwrapping.
+ * @throws GridNioException If failed to pass event to the next filter.
+ */
+ private Status unwrapHandshake() throws SSLException, IgniteCheckedException {
+ // Flip input buffer so we can read the collected data.
+ readFromNet();
+
+ inNetBuf.flip();
+
+ SSLEngineResult res = unwrap0();
+ handshakeStatus = res.getHandshakeStatus();
+
+ checkStatus(res);
+
+ // If handshake finished, no data was produced, and the status is still ok,
+ // try to unwrap more
+ if (handshakeStatus == FINISHED && res.getStatus() == OK && inNetBuf.hasRemaining()) {
+ res = unwrap0();
+
+ handshakeStatus = res.getHandshakeStatus();
+
+ // prepare to be written again
+ inNetBuf.compact();
+
+ renegotiateIfNeeded(res);
+ }
+ else
+ // prepare to be written again
+ inNetBuf.compact();
+
+ return res.getStatus();
+ }
+
+ /**
+ * Performs raw unwrap from network read buffer.
+ *
+ * @return Result.
+ * @throws SSLException If SSL exception occurs.
+ */
+ private SSLEngineResult unwrap0() throws SSLException {
+ SSLEngineResult res;
+
+ do {
+ res = sslEngine.unwrap(inNetBuf, appBuf);
+
+ if (log.isDebugEnabled())
+ log.debug("Unwrapped raw data [status=" + res.getStatus() + ", handshakeStatus=" +
+ res.getHandshakeStatus() + ']');
+
+ if (res.getStatus() == Status.BUFFER_OVERFLOW)
+ appBuf = expandBuffer(appBuf, appBuf.capacity() * 2);
+ }
+ while ((res.getStatus() == OK || res.getStatus() == Status.BUFFER_OVERFLOW) &&
+ (handshakeFinished && res.getHandshakeStatus() == NOT_HANDSHAKING
+ || res.getHandshakeStatus() == NEED_UNWRAP));
+
+ return res;
+ }
+
+ /**
+ * @param res SSL engine result.
+ * @throws SSLException If status is not acceptable.
+ */
+ private void checkStatus(SSLEngineResult res)
+ throws SSLException {
+
+ Status status = res.getStatus();
+
+ if (status != OK && status != CLOSED && status != BUFFER_UNDERFLOW)
+ throw new SSLException("Failed to unwrap incoming data (SSL engine error). Status: " + status);
+ }
+
+ /**
+ * Check status and retry the negotiation process if needed.
+ *
+ * @param res Result.
+ * @throws GridNioException If exception occurred during handshake.
+ * @throws SSLException If failed to process SSL data
+ */
+ private void renegotiateIfNeeded(SSLEngineResult res) throws IgniteCheckedException, SSLException {
+ if (res.getStatus() != CLOSED && res.getStatus() != BUFFER_UNDERFLOW
+ && res.getHandshakeStatus() != NOT_HANDSHAKING) {
+ // Renegotiation required.
+ handshakeStatus = res.getHandshakeStatus();
+
+ if (log.isDebugEnabled())
+ log.debug("Renegotiation requested [status=" + res.getStatus() + ", handshakeStatus = " +
+ handshakeStatus + ']');
+
+ handshakeFinished = false;
+
+ handshake();
+ }
+ }
+
+ /**
+ * Allocate application buffer.
+ */
+ private ByteBuffer allocateAppBuff() {
+ int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
+
+ int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);
+
+ return ByteBuffer.allocate(appBufSize);
+ }
+
+ /**
+ * Read data from net buffer.
+ */
+ private void readFromNet() {
+ try {
+ inNetBuf.clear();
+
+ ch.read(inNetBuf);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Copies data from out net buffer and passes it to the underlying chain.
+ *
+ * @return Nothing.
+ * @throws GridNioException If send failed.
+ */
+ private void writeNetBuffer() throws IgniteCheckedException {
+ try {
+ ch.write(outNetBuf);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to write byte to socket.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index be8a4e8..a05135f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -393,6 +393,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
public static ByteBuffer expandBuffer(ByteBuffer original, int cap) {
ByteBuffer res = ByteBuffer.allocate(cap);
+ res.order(ByteOrder.nativeOrder());
+
original.flip();
res.put(original);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index ac22d74..dc3d870 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.*;
import static javax.net.ssl.SSLEngineResult.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
+import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
import static org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.*;
/**
@@ -96,7 +97,14 @@ class GridNioSslHandler extends ReentrantLock {
sslEngine = engine;
- sslEngine.beginHandshake();
+ if (ses.meta(SSL_ENGINE.ordinal()) == null)
+ sslEngine.beginHandshake();
+ else {
+ sslEngine = ses.meta(SSL_ENGINE.ordinal());
+
+ handshakeFinished = true;
+ initHandshakeComplete = true;
+ }
handshakeStatus = sslEngine.getHandshakeStatus();
@@ -114,6 +122,8 @@ class GridNioSslHandler extends ReentrantLock {
appBuf = ByteBuffer.allocate(appBufSize);
+ appBuf.order(ByteOrder.nativeOrder());
+
if (log.isDebugEnabled())
log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBufSize + ']');
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..52eeb65 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.ssl.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.util.ipc.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.nio.ssl.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
@@ -41,6 +43,7 @@ import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
+import javax.net.ssl.*;
import java.io.*;
import java.net.*;
import java.nio.*;
@@ -146,6 +149,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
+ /** Node attribute that is set if using SSL (value is <tt>comm.tcp.ssl</tt>). */
+ public static final String ATTR_SSL = "comm.tcp.ssl";
+
/** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
public static final String ATTR_ADDRS = "comm.tcp.addrs";
@@ -747,6 +753,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
};
/**
+ * @return {@code True} if ssl enabled.
+ */
+ private boolean isSslEnabled() {
+ return ignite.configuration().getSslContextFactory() != null;
+ }
+
+ /**
* Sets address resolver.
*
* @param addrRslvr Address resolver.
@@ -1298,7 +1311,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
createSpiAttributeName(ATTR_PORT), boundTcpPort,
createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null,
- createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+ createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs,
+ createSpiAttributeName(ATTR_SSL), isSslEnabled());
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e);
@@ -1465,6 +1479,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
} :
null;
+ GridNioFilter[] filters;
+
+ if (isSslEnabled()) {
+ GridNioSslFilter sslFilter =
+ new GridNioSslFilter(ignite.configuration().getSslContextFactory().createSslContext(), log);
+
+ sslFilter.directMode(true);
+
+ filters = new GridNioFilter[] {
+ new GridNioCodecFilter(parser, log, true),
+ new GridConnectionBytesVerifyFilter(log),
+ sslFilter
+ };
+ }
+ else
+ filters = new GridNioFilter[] {
+ new GridNioCodecFilter(parser, log, true),
+ new GridConnectionBytesVerifyFilter(log)
+ };
+
GridNioServer<Message> srvr =
GridNioServer.<Message>builder()
.address(locHost)
@@ -1482,8 +1516,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.directMode(true)
.metricsListener(metricsLsnr)
.writeTimeout(sockWriteTimeout)
- .filters(new GridNioCodecFilter(parser, log, true),
- new GridConnectionBytesVerifyFilter(log))
+ .filters(filters)
.messageFormatter(msgFormatter)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
@@ -1510,6 +1543,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
onException("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']', e);
}
+ catch (SSLException e) {
+ throw new IgniteCheckedException("Failed to create SSL context. SSL factory: "
+ + ignite.configuration().getSslContextFactory() + '.', e);
+ }
}
// If free port wasn't found.
@@ -1872,7 +1909,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), connTimeout0);
+ safeHandshake(client, null, node.id(), connTimeout0, null);
}
catch (HandshakeTimeoutException e) {
if (log.isDebugEnabled())
@@ -2019,10 +2056,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
+ GridTuple<SSLEngine> ssl = new GridTuple<>();
+
try {
ch.socket().connect(addr, (int)connTimeout);
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl);
if (rcvCnt == -1)
return null;
@@ -2037,6 +2076,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
meta.put(NODE_ID_META, node.id());
+ if (isSslEnabled()) {
+ assert ssl != null;
+ assert ssl.get() != null;
+
+ meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get());
+ }
+
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
@@ -2161,6 +2207,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
+ * @param ssl SSL engine.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
* @return Handshake response.
*/
@@ -2169,7 +2216,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
T client,
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
- long timeout
+ long timeout,
+ @Nullable GridTuple<SSLEngine> ssl
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
@@ -2186,15 +2234,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
boolean success = false;
try {
- ByteBuffer buf = ByteBuffer.allocate(17);
+ BlockingSslHandler sslHnd = null;
+
+ ByteBuffer buf;
+
+ if (isSslEnabled()) {
+ GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>();
+
+ SSLEngine sslEngine = ignite.configuration().getSslContextFactory()
+ .createSslContext().createSSLEngine();
+
+ sslEngine.setUseClientMode(true);
+
+ sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log);
+
+ if (!sslHnd.handshake())
+ throw new IgniteCheckedException("SSL handshake isn't completed.");
+
+ ssl.set(sslEngine);
+
+ ByteBuffer handBuff = handFut.get();
+
+ if (handBuff.limit() < 17) {
+ buf = ByteBuffer.allocate(1000);
+
+ int read = ch.read(buf);
+
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+
+ buf.flip();
+
+ buf = sslHnd.decode(buf);
+ }
+ else
+ buf = handBuff;
+ }
+ else {
+ buf = ByteBuffer.allocate(17);
- for (int i = 0; i < 17; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < 17; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
- i += read;
+ i += read;
+ }
}
UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
@@ -2205,7 +2291,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + rmtNodeId0);
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ if (isSslEnabled() ) {
+ assert sslHnd != null;
+
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
if (recovery != null) {
HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
@@ -2225,30 +2317,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
buf.flip();
- ch.write(buf);
+ if (isSslEnabled()) {
+ assert sslHnd != null;
+
+ ch.write(sslHnd.encrypt(buf));
+ }
+ else
+ ch.write(buf);
+ }
+ else {
+ if (isSslEnabled()) {
+ assert sslHnd != null;
+
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
}
- else
- ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
if (recovery != null) {
if (log.isDebugEnabled())
log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
- buf = ByteBuffer.allocate(9);
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- buf.order(ByteOrder.nativeOrder());
+ buf = ByteBuffer.allocate(1000);
+
+ buf.order(ByteOrder.nativeOrder());
- for (int i = 0; i < 9; ) {
int read = ch.read(buf);
if (read == -1)
throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
"(connection closed).");
- i += read;
+ buf.flip();
+
+ rcvCnt = sslHnd.decode(buf).getLong(1);
}
+ else {
+ buf = ByteBuffer.allocate(9);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ for (int i = 0; i < 9; ) {
+ int read = ch.read(buf);
+
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- rcvCnt = buf.getLong(1);
+ i += read;
+ }
+
+ rcvCnt = buf.getLong(1);
+ }
if (log.isDebugEnabled())
log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index bfed977..d3a2521 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.communication;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -32,6 +33,7 @@ import org.apache.ignite.testframework.junits.spi.*;
import java.net.*;
import java.util.*;
import java.util.Map.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.IgniteNodeAttributes.*;
@@ -59,6 +61,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
/** */
private static final Object mux = new Object();
+ /** */
+ protected boolean useSsl = false;
+
/**
*
*/
@@ -181,6 +186,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
public void testSendToManyNodes() throws Exception {
msgDestMap.clear();
+ int cnt = 0;
+
// Send message from each SPI to all SPI's, including itself.
for (Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
UUID sndId = entry.getKey();
@@ -299,6 +306,15 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
rsrcs.inject(spi);
+ if (useSsl) {
+ IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
+
+ IgniteConfiguration cfg = ignite.configuration()
+ .setSslContextFactory(GridTestUtils.sslContextFactory());
+
+ ignite.setStaticCfg(cfg);
+ }
+
spi.setListener(new MessageListener(rsrcs.getNodeId()));
node.setAttributes(spi.getNodeAttributes());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
new file mode 100644
index 0000000..e5f8bb3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.testframework.junits.spi.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiSslSelfTest extends GridTcpCommunicationSpiAbstractTest {
+ /** */
+ public GridTcpCommunicationSpiSslSelfTest() {
+ super(false);
+
+ this.useSsl = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean tcpNoDelay() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 2451f59..1471faa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -52,6 +52,9 @@ public class IgniteMock implements Ignite {
/** */
private final String home;
+ /** */
+ private IgniteConfiguration staticCfg;
+
/**
* Mock values
*
@@ -84,6 +87,9 @@ public class IgniteMock implements Ignite {
/** {@inheritDoc} */
@Override public IgniteConfiguration configuration() {
+ if (staticCfg != null)
+ return staticCfg;
+
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setMarshaller(marshaller);
@@ -298,4 +304,11 @@ public class IgniteMock implements Ignite {
@Override public <K> Affinity<K> affinity(String cacheName) {
return null;
}
+
+ /**
+ * @param staticCfg Configuration.
+ */
+ public void setStaticCfg(IgniteConfiguration staticCfg) {
+ this.staticCfg = staticCfg;
+ }
}
[2/2] incubator-ignite git commit: Merge branch 'master' into
ignite-323
Posted by nt...@apache.org.
Merge branch 'master' into ignite-323
Conflicts:
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c8478452
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c8478452
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c8478452
Branch: refs/heads/ignite-323
Commit: c84784525a660a45f3326beac17303e2279f5e5b
Parents: 730b104 3194415
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jul 20 15:26:15 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Jul 20 15:26:15 2015 +0300
----------------------------------------------------------------------
assembly/LICENSE_FABRIC | 17 +-
assembly/LICENSE_HADOOP | 11 -
modules/apache-license-gen/pom.xml | 2 +-
.../src/main/resources/META-INF/licenses.txt.vm | 4 +-
modules/core/licenses/jsr166-license.txt | 3 +
modules/core/licenses/snaptree-bsd-license.txt | 2 +-
.../IgniteClientDisconnectedException.java | 61 +
.../java/org/apache/ignite/IgniteCluster.java | 8 +
.../ignite/compute/ComputeJobResultPolicy.java | 3 +-
.../apache/ignite/internal/GridComponent.java | 18 +
.../ignite/internal/GridJobSiblingImpl.java | 2 +-
.../ignite/internal/GridKernalContext.java | 5 +
.../ignite/internal/GridKernalContextImpl.java | 31 +-
.../ignite/internal/GridKernalGateway.java | 46 +-
.../ignite/internal/GridKernalGatewayImpl.java | 85 +-
.../apache/ignite/internal/GridKernalState.java | 3 +
.../ignite/internal/GridPluginComponent.java | 11 +
...gniteClientDisconnectedCheckedException.java | 49 +
.../apache/ignite/internal/IgniteKernal.java | 234 +++-
.../cluster/IgniteClusterAsyncImpl.java | 5 +
.../internal/cluster/IgniteClusterImpl.java | 18 +
.../internal/managers/GridManagerAdapter.java | 19 +-
.../deployment/GridDeploymentCommunication.java | 2 +-
.../deployment/GridDeploymentManager.java | 95 +-
.../discovery/GridDiscoveryManager.java | 163 ++-
.../failover/GridFailoverContextImpl.java | 28 +-
.../managers/failover/GridFailoverManager.java | 13 +-
.../processors/GridProcessorAdapter.java | 11 +
.../affinity/GridAffinityAssignmentCache.java | 26 +-
.../cache/CacheOsConflictResolutionManager.java | 6 +
.../cache/DynamicCacheChangeBatch.java | 17 +
.../processors/cache/GridCacheAdapter.java | 27 +-
.../cache/GridCacheAffinityManager.java | 21 +-
.../cache/GridCacheConcurrentMap.java | 15 +-
.../processors/cache/GridCacheContext.java | 45 +-
.../processors/cache/GridCacheGateway.java | 116 +-
.../processors/cache/GridCacheIoManager.java | 8 +
.../processors/cache/GridCacheManager.java | 6 +
.../cache/GridCacheManagerAdapter.java | 6 +
.../processors/cache/GridCacheMvccManager.java | 43 +-
.../GridCachePartitionExchangeManager.java | 134 +-
.../processors/cache/GridCachePreloader.java | 5 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../processors/cache/GridCacheProcessor.java | 311 ++++-
.../cache/GridCacheSharedContext.java | 113 +-
.../cache/GridCacheSharedManager.java | 11 +-
.../cache/GridCacheSharedManagerAdapter.java | 20 +-
.../processors/cache/GridCacheUtils.java | 9 +
.../processors/cache/IgniteCacheFutureImpl.java | 5 +
.../processors/cache/IgniteCacheProxy.java | 4 +-
.../processors/cache/IgniteInternalCache.java | 1 +
.../CacheDataStructuresManager.java | 35 +
.../distributed/GridCacheTxFinishSync.java | 46 +
.../distributed/dht/GridDhtCacheAdapter.java | 14 +-
.../dht/GridDhtPartitionTopologyImpl.java | 24 +
.../distributed/dht/GridDhtTopologyFuture.java | 14 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 9 +-
.../dht/GridPartitionedGetFuture.java | 13 +-
.../dht/atomic/GridDhtAtomicCache.java | 4 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 11 +-
.../GridDhtPartitionsExchangeFuture.java | 31 +-
.../dht/preloader/GridDhtPreloader.java | 16 +-
.../distributed/near/GridNearCacheAdapter.java | 8 +
.../distributed/near/GridNearGetFuture.java | 13 +-
.../cache/dr/GridOsCacheDrManager.java | 7 +-
.../query/GridCacheDistributedQueryManager.java | 22 +
.../cache/query/GridCacheQueryAdapter.java | 11 +-
.../query/GridCacheQueryFutureAdapter.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 5 +
.../transactions/IgniteTransactionsImpl.java | 59 +-
.../cache/transactions/IgniteTxManager.java | 23 +-
.../transactions/TransactionProxyImpl.java | 2 +-
.../cache/version/GridCacheVersionManager.java | 9 +-
.../clock/GridClockSyncProcessor.java | 6 +-
.../processors/closure/AffinityTask.java | 35 +
.../closure/GridClosureProcessor.java | 63 +-
.../processors/cluster/ClusterProcessor.java | 11 +
.../continuous/GridContinuousHandler.java | 9 +-
.../continuous/GridContinuousProcessor.java | 127 +-
.../datastreamer/DataStreamProcessor.java | 24 +-
.../datastreamer/DataStreamerImpl.java | 90 +-
.../datastructures/DataStructuresProcessor.java | 33 +-
.../datastructures/GridCacheAtomicLongImpl.java | 33 +-
.../GridCacheAtomicReferenceImpl.java | 34 +-
.../GridCacheAtomicSequenceImpl.java | 33 +-
.../GridCacheAtomicStampedImpl.java | 33 +-
.../GridCacheCountDownLatchImpl.java | 51 +-
.../datastructures/GridCacheRemovable.java | 6 +-
.../datastructures/GridCacheSetImpl.java | 15 +-
.../datastructures/GridCacheSetProxy.java | 47 +-
.../processors/job/GridJobProcessor.java | 2 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../processors/query/GridQueryIndexing.java | 7 +
.../processors/query/GridQueryProcessor.java | 6 +
.../service/GridServiceProcessor.java | 45 +-
.../processors/service/GridServiceProxy.java | 13 +-
.../processors/task/GridTaskProcessor.java | 55 +-
.../processors/task/GridTaskWorker.java | 83 +-
.../ignite/internal/util/IgniteUtils.java | 30 +-
.../shmem/IpcSharedMemoryClientEndpoint.java | 5 +-
.../ignite/internal/util/lang/GridFunc.java | 2 +
.../plugin/security/SecurityPermission.java | 7 +-
.../java/org/apache/ignite/spi/IgniteSpi.java | 15 +
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 51 +-
.../communication/tcp/TcpCommunicationSpi.java | 358 ++++--
.../spi/discovery/DiscoverySpiDataExchange.java | 3 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 408 ++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 177 ++-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 9 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 33 +-
.../tcp/internal/TcpDiscoveryNode.java | 19 +
.../messages/TcpDiscoveryAbstractMessage.java | 3 +
.../messages/TcpDiscoveryClientAckResponse.java | 64 +
.../messages/TcpDiscoveryHandshakeResponse.java | 14 +
.../ignite/spi/failover/FailoverContext.java | 18 +
.../spi/failover/always/AlwaysFailoverSpi.java | 25 +
.../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +-
.../internal/GridUpdateNotifierSelfTest.java | 15 +-
.../IgniteClientReconnectAbstractTest.java | 363 ++++++
.../IgniteClientReconnectApiExceptionTest.java | 846 ++++++++++++
.../IgniteClientReconnectAtomicsTest.java | 672 ++++++++++
.../IgniteClientReconnectCacheTest.java | 1202 ++++++++++++++++++
.../IgniteClientReconnectCollectionsTest.java | 443 +++++++
.../IgniteClientReconnectComputeTest.java | 192 +++
...eClientReconnectContinuousProcessorTest.java | 372 ++++++
...IgniteClientReconnectDiscoveryStateTest.java | 123 ++
...niteClientReconnectFailoverAbstractTest.java | 231 ++++
.../IgniteClientReconnectFailoverTest.java | 227 ++++
.../IgniteClientReconnectServicesTest.java | 260 ++++
.../internal/IgniteClientReconnectStopTest.java | 106 ++
.../IgniteClientReconnectStreamerTest.java | 233 ++++
.../IgniteSlowClientDetectionSelfTest.java | 1 +
.../GridDeploymentManagerStopSelfTest.java | 7 +
.../cache/CacheAffinityCallSelfTest.java | 172 +++
.../cache/GridCacheAffinityRoutingSelfTest.java | 157 ++-
.../IgniteCacheAbstractStopBusySelfTest.java | 2 +-
.../cache/IgniteCacheDynamicStopSelfTest.java | 6 +-
.../cache/IgniteCacheNearLockValueSelfTest.java | 2 +
.../IgniteTxExceptionAbstractSelfTest.java | 1 +
.../distributed/IgniteCache150ClientsTest.java | 1 +
.../IgniteCacheClientReconnectTest.java | 175 +++
.../IgniteCacheServerNodeConcurrentStart.java | 96 ++
.../IgniteCacheSystemTransactionsSelfTest.java | 2 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 52 +-
...gniteCachePutRetryTransactionalSelfTest.java | 17 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 4 +-
.../GridCacheReplicatedInvalidateSelfTest.java | 3 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 562 +++++++-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 140 +-
.../spi/failover/GridFailoverTestContext.java | 10 +
.../testframework/junits/GridAbstractTest.java | 5 +
.../multijvm/IgniteClusterProcessProxy.java | 5 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 1 +
.../testsuites/IgniteCacheTestSuite2.java | 2 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 -
.../testsuites/IgniteClientNodesTestSuite.java | 42 +
.../IgniteClientReconnectTestSuite.java | 48 +
.../processors/query/h2/IgniteH2Indexing.java | 5 +
.../query/h2/twostep/GridMergeIndex.java | 45 +-
.../h2/twostep/GridReduceQueryExecutor.java | 70 +-
...ClientReconnectCacheQueriesFailoverTest.java | 225 ++++
.../cache/IgniteClientReconnectQueriesTest.java | 427 +++++++
...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 +
.../IgniteCacheWithIndexingTestSuite.java | 1 +
modules/jta/licenses/jta-license.txt | 2 +
.../ignite/schema/ui/SchemaImportApp.java | 36 +-
modules/yarn/pom.xml | 4 +-
pom.xml | 2 -
169 files changed, 10975 insertions(+), 1037 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c8478452/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c8478452/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 52eeb65,e9fd696..29aa3e2
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@@ -2291,16 -2261,10 +2347,16 @@@ public class TcpCommunicationSpi extend
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + rmtNodeId0);
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ if (isSslEnabled() ) {
+ assert sslHnd != null;
+
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
+ HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
recovery.incrementConnectCount(),
recovery.receivedCount());
@@@ -2317,23 -2281,10 +2373,23 @@@
buf.flip();
- ch.write(buf);
+ if (isSslEnabled()) {
+ assert sslHnd != null;
+
+ ch.write(sslHnd.encrypt(buf));
+ }
+ else
+ ch.write(buf);
+ }
+ else {
+ if (isSslEnabled()) {
+ assert sslHnd != null;
+
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)));
++ ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+ }
+ else
- ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
++ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
}
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
if (recovery != null) {
if (log.isDebugEnabled())