You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:39 UTC

[18/28] incubator-ignite git commit: ignite-545: merge from sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 08a9937..733ae81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -176,9 +176,11 @@ public class GridNioRecoveryDescriptor {
         while (acked < rcvCnt) {
             GridNioFuture<?> fut = msgFuts.pollFirst();
 
-            assert fut != null;
+            assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
+                ", acked=" + acked +
+                ", desc=" + this + ']';
 
-            assert fut.isDone();
+            assert fut.isDone() : fut;
 
             acked++;
         }
@@ -239,9 +241,12 @@ public class GridNioRecoveryDescriptor {
      * @param rcvCnt Number of messages received by remote node.
      */
     public void onHandshake(long rcvCnt) {
-        ackReceived(rcvCnt);
+        synchronized (this) {
+            if (!nodeLeft)
+                ackReceived(rcvCnt);
 
-        resendCnt = msgFuts.size();
+            resendCnt = msgFuts.size();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
deleted file mode 100644
index 72c20f8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Grid client for NIO server.
- */
-public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
-    /** Socket. */
-    private final Socket sock;
-
-    /** Output stream. */
-    private final UnsafeBufferedOutputStream out;
-
-    /** Minimum buffered message count. */
-    private final int minBufferedMsgCnt;
-
-    /** Communication buffer size ratio. */
-    private final double bufSizeRatio;
-
-    /** */
-    private final ByteBuffer writeBuf;
-
-    /** */
-    private final MessageFormatter formatter;
-
-    /**
-     * @param metricsLsnr Metrics listener.
-     * @param addr Address.
-     * @param locHost Local address.
-     * @param connTimeout Connect timeout.
-     * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
-     * @param sockRcvBuf Socket receive buffer.
-     * @param sockSndBuf Socket send buffer.
-     * @param bufSize Buffer size (or {@code 0} to disable buffer).
-     * @param minBufferedMsgCnt Minimum buffered message count.
-     * @param bufSizeRatio Communication buffer size ratio.
-     * @param formatter Message formatter.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridTcpCommunicationClient(
-        GridNioMetricsListener metricsLsnr,
-        InetSocketAddress addr,
-        InetAddress locHost,
-        long connTimeout,
-        boolean tcpNoDelay,
-        int sockRcvBuf,
-        int sockSndBuf,
-        int bufSize,
-        int minBufferedMsgCnt,
-        double bufSizeRatio,
-        MessageFormatter formatter
-    ) throws IgniteCheckedException {
-        super(metricsLsnr);
-
-        assert metricsLsnr != null;
-        assert addr != null;
-        assert locHost != null;
-        assert connTimeout >= 0;
-        assert bufSize >= 0;
-
-        A.ensure(minBufferedMsgCnt >= 0,
-            "Value of minBufferedMessageCount property cannot be less than zero.");
-        A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
-            "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
-
-        this.minBufferedMsgCnt = minBufferedMsgCnt;
-        this.bufSizeRatio = bufSizeRatio;
-        this.formatter = formatter;
-
-        writeBuf = ByteBuffer.allocate(8 << 10);
-
-        writeBuf.order(ByteOrder.nativeOrder());
-
-        sock = new Socket();
-
-        boolean success = false;
-
-        try {
-            sock.bind(new InetSocketAddress(locHost, 0));
-
-            sock.setTcpNoDelay(tcpNoDelay);
-
-            if (sockRcvBuf > 0)
-                sock.setReceiveBufferSize(sockRcvBuf);
-
-            if (sockSndBuf > 0)
-                sock.setSendBufferSize(sockSndBuf);
-
-            sock.connect(addr, (int)connTimeout);
-
-            out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
-
-            success = true;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to connect to remote host " +
-                "[addr=" + addr + ", localHost=" + locHost + ']', e);
-        }
-        finally {
-            if (!success)
-                U.closeQuiet(sock);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
-        try {
-            handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " +
-                sock.getRemoteSocketAddress(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        boolean res = super.close();
-
-        if (res) {
-            U.closeQuiet(out);
-            U.closeQuiet(sock);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        super.forceClose();
-
-        try {
-            out.flush();
-        }
-        catch (IOException ignored) {
-            // No-op.
-        }
-
-        // Do not call (directly or indirectly) out.close() here
-        // since it may cause a deadlock.
-        out.forceClose();
-
-        U.closeQuiet(sock);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        try {
-            out.write(data, 0, len);
-
-            metricsLsnr.onBytesSent(len);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
-        throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        assert writeBuf.hasArray();
-
-        try {
-            int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer());
-
-            metricsLsnr.onBytesSent(cnt);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-
-        return false;
-    }
-
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        assert timeout > 0;
-
-        out.flushOnTimeout(timeout);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTcpCommunicationClient.class, this, super.toString());
-    }
-
-    /**
-     *
-     */
-    private class UnsafeBufferedOutputStream extends FilterOutputStream {
-        /** The internal buffer where data is stored. */
-        private final byte buf[];
-
-        /** Current size. */
-        private int size;
-
-        /** Count. */
-        private int cnt;
-
-        /** Message count. */
-        private int msgCnt;
-
-        /** Total messages size. */
-        private int totalCnt;
-
-        /** Lock. */
-        private final ReentrantLock lock = new ReentrantLock();
-
-        /** Last flushed timestamp. */
-        private volatile long lastFlushed = U.currentTimeMillis();
-
-        /** Cached flush timeout. */
-        private volatile long flushTimeout;
-
-        /** Buffer adjusted timestamp. */
-        private long lastAdjusted = U.currentTimeMillis();
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream.
-         *
-         * @param out The underlying output stream.
-         */
-        UnsafeBufferedOutputStream(OutputStream out) {
-            this(out, 8192);
-        }
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream with the specified buffer
-         * size.
-         *
-         * @param out The underlying output stream.
-         * @param size The buffer size.
-         */
-        UnsafeBufferedOutputStream(OutputStream out, int size) {
-            super(out);
-
-            assert size >= 0;
-
-            this.size = size;
-            buf = size > 0 ? new byte[size] : null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(int b) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(byte[] b, int off, int len) throws IOException {
-            assert b != null;
-            assert off == 0;
-
-            // No buffering.
-            if (buf == null) {
-                lock.lock();
-
-                try {
-                    out.write(b, 0, len);
-                }
-                finally {
-                    lock.unlock();
-                }
-
-                return;
-            }
-
-            // Buffering is enabled.
-            lock.lock();
-
-            try {
-                msgCnt++;
-                totalCnt += len;
-
-                if (len >= size) {
-                    flushLocked();
-
-                    out.write(b, 0, len);
-
-                    lastFlushed = U.currentTimeMillis();
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                if (cnt + len > size) {
-                    flushLocked();
-
-                    messageToBuffer0(b, off, len, buf, 0);
-
-                    cnt = len;
-
-                    assert cnt < size;
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                messageToBuffer0(b, 0, len, buf, cnt);
-
-                cnt += len;
-
-                if (cnt == size)
-                    flushLocked();
-                else
-                    flushIfNeeded();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushIfNeeded() throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                flushOnTimeoutLocked(flushTimeout0);
-        }
-
-        /**
-         *
-         */
-        private void adjustBufferIfNeeded() {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                adjustBufferLocked(flushTimeout0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void flush() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        public void flushOnTimeout(long timeout) throws IOException {
-            assert buf != null;
-            assert timeout > 0;
-
-            // Overwrite cached value.
-            flushTimeout = timeout;
-
-            if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
-                return;
-
-            try {
-                flushOnTimeoutLocked(timeout);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        private void flushOnTimeoutLocked(long timeout) throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            // Double check.
-            if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
-                return;
-
-            flushLocked();
-
-            adjustBufferLocked(timeout);
-        }
-
-        /**
-         * @param timeout Timeout.
-         */
-        private void adjustBufferLocked(long timeout) {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            long time = U.currentTimeMillis();
-
-            if (lastAdjusted + timeout < time) {
-                if (msgCnt <= minBufferedMsgCnt)
-                    size = 0;
-                else {
-                    size = (int)(totalCnt * bufSizeRatio);
-
-                    if (size > buf.length)
-                        size = buf.length;
-                }
-
-                msgCnt = 0;
-                totalCnt = 0;
-
-                lastAdjusted = time;
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushLocked() throws IOException {
-            assert lock.isHeldByCurrentThread();
-
-            if (buf != null && cnt > 0) {
-                out.write(buf, 0, cnt);
-
-                cnt = 0;
-            }
-
-            out.flush();
-
-            lastFlushed = U.currentTimeMillis();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                try {
-                    out.close();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        /**
-         * Forcibly closes underlying stream ignoring any possible exception.
-         */
-        public void forceClose() {
-            try {
-                out.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-        }
-
-        /**
-         * @param b Buffer to copy from.
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) {
-            assert b.length == len;
-            assert off == 0;
-            assert resBuf.length >= resOff + len + 4;
-
-            U.intToBytes(len, resBuf, resOff);
-
-            U.arrayCopy(b, off, resBuf, resOff + 4, len);
-        }
-
-        /**
-         * @param b Buffer to copy from (length included).
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) {
-            assert off == 0;
-            assert resBuf.length >= resOff + len;
-
-            U.arrayCopy(b, off, resBuf, resOff, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            lock.lock();
-
-            try {
-                return S.toString(UnsafeBufferedOutputStream.class, this);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 788a8e6..abad875 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
         return false;
     }
 
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public boolean async() {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index bd24ecf..9e15d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -202,7 +202,7 @@ public class VisorCache implements Serializable {
         offHeapAllocatedSize = ca.offHeapAllocatedSize();
         offHeapEntriesCnt = ca.offHeapEntriesCount();
         partitions = ca.affinity().partitions();
-        metrics = VisorCacheMetrics.from(ignite, ca);
+        metrics = VisorCacheMetrics.from(ignite, cacheName);
 
         estimateMemorySize(ignite, ca, sample);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java
index ef12424..c8913c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java
@@ -45,17 +45,17 @@ public class VisorCacheConfigurationCollectorJob
 
     /** {@inheritDoc} */
     @Override protected Map<IgniteUuid, VisorCacheConfiguration> run(Collection<IgniteUuid> arg) {
-        Collection<GridCacheAdapter<?, ?>> caches = ignite.context().cache().internalCaches();
+        Collection<IgniteCacheProxy<?, ?>> caches = ignite.context().cache().jcaches();
 
         boolean all = arg == null || arg.isEmpty();
 
         Map<IgniteUuid, VisorCacheConfiguration> res = U.newHashMap(caches.size());
 
-        for (GridCacheAdapter<?, ?> cache : caches) {
+        for (IgniteCacheProxy<?, ?> cache : caches) {
             IgniteUuid deploymentId = cache.context().dynamicDeploymentId();
 
             if (all || arg.contains(deploymentId))
-                res.put(deploymentId, config(cache.configuration()));
+                res.put(deploymentId, config(cache.getConfiguration(CacheConfiguration.class)));
         }
 
         return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index 30be424..c5d70a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.visor.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -166,19 +167,21 @@ public class VisorCacheMetrics implements Serializable {
 
     /**
      * @param ignite Ignite.
-     * @param c Cache.
+     * @param cacheName Cache name.
      * @return Data transfer object for given cache metrics.
      */
-    public static VisorCacheMetrics from(IgniteEx ignite, IgniteInternalCache c) {
+    public static VisorCacheMetrics from(IgniteEx ignite, String cacheName) {
         VisorCacheMetrics cm = new VisorCacheMetrics();
 
-        CacheMetrics m = c.metrics();
-
         GridCacheProcessor cacheProcessor = ignite.context().cache();
 
-        cm.name = c.name();
-        cm.mode = cacheProcessor.cacheMode(c.name());
-        cm.sys = cacheProcessor.systemCache(c.name());
+        IgniteCache<Object, Object> c = cacheProcessor.jcache(cacheName);
+
+        cm.name = cacheName;
+        cm.mode = cacheProcessor.cacheMode(cacheName);
+        cm.sys = cacheProcessor.systemCache(cacheName);
+
+        CacheMetrics m = c.metrics();
 
         cm.size = m.getSize();
         cm.keySize = m.getKeySize();
@@ -208,7 +211,7 @@ public class VisorCacheMetrics implements Serializable {
         cm.commitsPerSec = perSecond(m.getAverageTxCommitTime());
         cm.rollbacksPerSec = perSecond(m.getAverageTxRollbackTime());
 
-        cm.qryMetrics = VisorCacheQueryMetrics.from(c.context().queries().metrics());
+        cm.qryMetrics = VisorCacheQueryMetrics.from(c.queryMetrics());
 
         cm.dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize();
         cm.txThreadMapSize = m.getTxThreadMapSize();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java
index 8fd42a0..23263c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java
@@ -99,17 +99,19 @@ public class VisorCacheMetricsCollectorTask extends VisorMultiNodeTask<IgniteBiT
 
             GridCacheProcessor cacheProcessor = ignite.context().cache();
 
-            Collection<GridCacheAdapter<?, ?>> caches = cacheProcessor.internalCaches();
+            Collection<IgniteCacheProxy<?, ?>> caches = cacheProcessor.jcaches();
 
             Collection<VisorCacheMetrics> res = new ArrayList<>(caches.size());
 
             boolean allCaches = cacheNames.isEmpty();
 
-            for (GridCacheAdapter ca : caches) {
+            for (IgniteCacheProxy ca : caches) {
                 if (ca.context().started()) {
-                    VisorCacheMetrics cm = VisorCacheMetrics.from(ignite, ca);
+                    String cacheName = ca.getName();
 
-                    if ((allCaches || cacheNames.contains(ca.name())) && (showSysCaches || !cm.system()))
+                    VisorCacheMetrics cm = VisorCacheMetrics.from(ignite, cacheName);
+
+                    if ((allCaches || cacheNames.contains(cacheName)) && (showSysCaches || !cm.system()))
                         res.add(cm);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java
index 06dbfbf..ab24a3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.visor.cache;
 
-import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cache.store.jdbc.*;
 import org.apache.ignite.configuration.*;
@@ -72,10 +71,10 @@ public class VisorCacheStoreConfiguration implements Serializable {
      * @param ccfg Cache configuration.
      * @return Data transfer object for cache store configuration properties.
      */
-    public static VisorCacheStoreConfiguration from(Ignite ignite, CacheConfiguration ccfg) {
+    public static VisorCacheStoreConfiguration from(IgniteEx ignite, CacheConfiguration ccfg) {
         VisorCacheStoreConfiguration cfg = new VisorCacheStoreConfiguration();
 
-        GridCacheAdapter<Object, Object> c = ((IgniteKernal)ignite).internalCache(ccfg.getName());
+        IgniteCacheProxy<Object, Object> c = ignite.context().cache().jcache(ccfg.getName());
 
         CacheStore store = c != null && c.context().started() ? c.context().store().configuredStore() : null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
index fde871b..3b2d45c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -87,7 +88,7 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData
                 else {
                     // Ignore nodes that left topology.
                     if (!(unhandledEx instanceof ClusterGroupEmptyException))
-                        taskRes.unhandledEx().put(nid, unhandledEx);
+                        taskRes.unhandledEx().put(nid, new VisorExceptionWrapper(unhandledEx));
                 }
             }
         }
@@ -116,13 +117,13 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData
             taskRes.events().addAll(jobRes.events());
 
         if (jobRes.eventsEx() != null)
-            taskRes.eventsEx().put(nid, jobRes.eventsEx());
+            taskRes.eventsEx().put(nid, new VisorExceptionWrapper(jobRes.eventsEx()));
 
         if (!jobRes.caches().isEmpty())
             taskRes.caches().put(nid, jobRes.caches());
 
         if (jobRes.cachesEx() != null)
-            taskRes.cachesEx().put(nid, jobRes.cachesEx());
+            taskRes.cachesEx().put(nid, new VisorExceptionWrapper(jobRes.cachesEx()));
 
         if (!jobRes.igfss().isEmpty())
             taskRes.igfss().put(nid, jobRes.igfss());
@@ -131,6 +132,6 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData
             taskRes.igfsEndpoints().put(nid, jobRes.igfsEndpoints());
 
         if (jobRes.igfssEx() != null)
-            taskRes.igfssEx().put(nid, jobRes.igfssEx());
+            taskRes.igfssEx().put(nid, new VisorExceptionWrapper(jobRes.igfssEx()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
index 6485978..1a4eb02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.visor.node;
 import org.apache.ignite.internal.visor.cache.*;
 import org.apache.ignite.internal.visor.event.*;
 import org.apache.ignite.internal.visor.igfs.*;
+import org.apache.ignite.internal.visor.util.*;
 
 import java.io.*;
 import java.util.*;
@@ -32,7 +33,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** Unhandled exceptions from nodes. */
-    private final Map<UUID, Throwable> unhandledEx = new HashMap<>();
+    private final Map<UUID, VisorExceptionWrapper> unhandledEx = new HashMap<>();
 
     /** Nodes grid names. */
     private final Map<UUID, String> gridNames = new HashMap<>();
@@ -50,13 +51,13 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     private final List<VisorGridEvent> evts = new ArrayList<>();
 
     /** Exceptions caught during collecting events from nodes. */
-    private final Map<UUID, Throwable> evtsEx = new HashMap<>();
+    private final Map<UUID, VisorExceptionWrapper> evtsEx = new HashMap<>();
 
     /** All caches collected from nodes. */
     private final Map<UUID, Collection<VisorCache>> caches = new HashMap<>();
 
     /** Exceptions caught during collecting caches from nodes. */
-    private final Map<UUID, Throwable> cachesEx = new HashMap<>();
+    private final Map<UUID, VisorExceptionWrapper> cachesEx = new HashMap<>();
 
     /** All IGFS collected from nodes. */
     private final Map<UUID, Collection<VisorIgfs>> igfss = new HashMap<>();
@@ -65,7 +66,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     private final Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints = new HashMap<>();
 
     /** Exceptions caught during collecting IGFS from nodes. */
-    private final Map<UUID, Throwable> igfssEx = new HashMap<>();
+    private final Map<UUID, VisorExceptionWrapper> igfssEx = new HashMap<>();
 
     /**
      * @return {@code true} If no data was collected.
@@ -88,7 +89,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     /**
      * @return Unhandled exceptions from nodes.
      */
-    public Map<UUID, Throwable> unhandledEx() {
+    public Map<UUID, VisorExceptionWrapper> unhandledEx() {
         return unhandledEx;
     }
 
@@ -123,7 +124,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     /**
      * @return Exceptions caught during collecting events from nodes.
      */
-    public Map<UUID, Throwable> eventsEx() {
+    public Map<UUID, VisorExceptionWrapper> eventsEx() {
         return evtsEx;
     }
 
@@ -137,7 +138,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     /**
      * @return Exceptions caught during collecting caches from nodes.
      */
-    public Map<UUID, Throwable> cachesEx() {
+    public Map<UUID, VisorExceptionWrapper> cachesEx() {
         return cachesEx;
     }
 
@@ -158,7 +159,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     /**
      * @return Exceptions caught during collecting IGFS from nodes.
      */
-    public Map<UUID, Throwable> igfssEx() {
+    public Map<UUID, VisorExceptionWrapper> igfssEx() {
         return igfssEx;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
index 8b39d09..9fc1cc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -83,12 +84,21 @@ public class VisorNodeSuppressedErrorsTask extends VisorMultiNodeTask<Map<UUID,
 
             List<IgniteExceptionRegistry.ExceptionInfo> errors = ignite.context().exceptionRegistry().getErrors(order);
 
+            List<IgniteExceptionRegistry.ExceptionInfo> wrapped = new ArrayList<>(errors.size());
+
             for (IgniteExceptionRegistry.ExceptionInfo error : errors) {
                 if (error.order() > order)
                     order = error.order();
+
+                wrapped.add(new IgniteExceptionRegistry.ExceptionInfo(error.order(),
+                    new VisorExceptionWrapper(error.error()),
+                    error.message(),
+                    error.threadId(),
+                    error.threadName(),
+                    error.time()));
             }
 
-            return new IgniteBiTuple<>(order, errors);
+            return new IgniteBiTuple<>(order, wrapped);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index 4a9daad..e977d2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 
 import javax.cache.*;
@@ -36,7 +37,7 @@ import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*;
 /**
  * Job for execute SCAN or SQL query and get first page of results.
  */
-public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? extends Exception, VisorQueryResultEx>> {
+public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx>> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -61,11 +62,11 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteBiTuple<? extends Exception, VisorQueryResultEx> run(VisorQueryArg arg) {
+    @Override protected IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx> run(VisorQueryArg arg) {
         try {
             UUID nid = ignite.localNode().id();
 
-            boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN");
+            boolean scan = arg.queryTxt() == null;
 
             String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" +
                 UUID.randomUUID();
@@ -110,8 +111,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
                 Collection<GridQueryFieldMetadata> meta = cur.fieldsMeta();
 
                 if (meta == null)
-                    return new IgniteBiTuple<Exception, VisorQueryResultEx>(
-                        new SQLException("Fail to execute query. No metadata available."), null);
+                    return new IgniteBiTuple<>(
+                        new VisorExceptionWrapper(new SQLException("Fail to execute query. No metadata available.")), null);
                 else {
                     List<VisorQueryField> names = new ArrayList<>(meta.size());
 
@@ -138,7 +139,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
             }
         }
         catch (Exception e) {
-            return new IgniteBiTuple<>(e, null);
+            return new IgniteBiTuple<>(VisorTaskUtils.wrap(e), null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 4f2fda5..98c876a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.visor.query;
 
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 
 /**
  * Task for execute SCAN or SQL query and get first page of results.
  */
 @GridInternal
-public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTuple<? extends Exception, VisorQueryResultEx>> {
+public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx>> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
new file mode 100644
index 0000000..d2ae0e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.util;
+
+/**
+ * Exception wrapper for safe for transferring to Visor.
+ */
+public class VisorExceptionWrapper extends Throwable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Detail message string of this throwable */
+    private String detailMsg;
+
+    /** Simple class name of base throwable object. */
+    private String clsSimpleName;
+
+    /** Class name of base throwable object. */
+    private String clsName;
+
+    /**
+     * Wrap throwable by presented on Visor throwable object.
+     *
+     * @param cause Base throwable object.
+     */
+    public VisorExceptionWrapper(Throwable cause) {
+        assert cause != null;
+
+        clsSimpleName = cause.getClass().getSimpleName();
+        clsName = cause.getClass().getName();
+
+        detailMsg = cause.getMessage();
+
+        StackTraceElement[] stackTrace = cause.getStackTrace();
+
+        if (stackTrace != null)
+            setStackTrace(stackTrace);
+
+        if (cause.getCause() != null)
+            initCause(new VisorExceptionWrapper(cause.getCause()));
+    }
+
+    /**
+     * @return Class simple name of base throwable object.
+     */
+    public String getClassSimpleName() {
+        return clsSimpleName;
+    }
+
+    /**
+     * @return Class name of base throwable object.
+     */
+    public String getClassName() {
+        return clsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getMessage() {
+        return detailMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return (detailMsg != null) ? (clsName + ": " + detailMsg) : clsName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index e8ae76d..b0afbc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -867,4 +867,14 @@ public class VisorTaskUtils {
 
         return bos.toByteArray();
     }
+
+    /**
+     * Wrap throwable object of any type to presented on Visor throwable object.
+     *
+     * @param e Base throwable object.
+     * @return Wrapped throwable object.
+     */
+    public static VisorExceptionWrapper wrap(Throwable e) {
+        return new VisorExceptionWrapper(e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
index 2ad07b5..5cdc72f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
@@ -100,14 +100,94 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean,
     public long getOverflowSize();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Number of gets from off-heap memory.")
+    public long getOffHeapGets();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of puts to off-heap memory.")
+    public long getOffHeapPuts();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of removed entries from off-heap memory.")
+    public long getOffHeapRemovals();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of evictions from off-heap memory.")
+    public long getOffHeapEvictions();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of hits on off-heap memory.")
+    public long getOffHeapHits();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of hits on off-heap memory.")
+    public float getOffHeapHitPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of misses on off-heap memory.")
+    public long getOffHeapMisses();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of misses on off-heap memory.")
+    public float getOffHeapMissPercentage();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Number of entries stored in off-heap memory.")
     public long getOffHeapEntriesCount();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Number of primary entries stored in off-heap memory.")
+    public long getOffHeapPrimaryEntriesCount();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of backup stored in off-heap memory.")
+    public long getOffHeapBackupEntriesCount();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Memory size allocated in off-heap.")
     public long getOffHeapAllocatedSize();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Off-heap memory maximum size.")
+    public long getOffHeapMaxSize();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of gets from swap.")
+    public long getSwapGets();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of puts to swap.")
+    public long getSwapPuts();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of removed entries from swap.")
+    public long getSwapRemovals();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of hits on swap.")
+    public long getSwapHits();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of misses on swap.")
+    public long getSwapMisses();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of hits on swap.")
+    public float getSwapHitPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of misses on swap.")
+    public float getSwapMissPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of entries stored in swap.")
+    public long getSwapEntriesCount();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Size of swap.")
+    public long getSwapSize();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Number of non-null values in the cache.")
     public int getSize();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
index 17bbc36..f064fde 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java
@@ -19,13 +19,22 @@ package org.apache.ignite.plugin;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
 
 /**
- * Pluggable ignite component.
+ * Pluggable Ignite component.
+ * <p>
+ * Ignite plugins are loaded using JDK {@link ServiceLoader}.
+ * First method called to initialize plugin is {@link PluginProvider#initExtensions(PluginContext, ExtensionRegistry)}.
+ * If plugin requires configuration it can be set in {@link IgniteConfiguration} using
+ * {@link IgniteConfiguration#setPluginConfigurations(PluginConfiguration...)}.
+ *
+ * @see IgniteConfiguration#setPluginConfigurations(PluginConfiguration...)
+ * @see PluginContext
  */
 public interface PluginProvider<C extends PluginConfiguration> {
     /**
@@ -49,18 +58,21 @@ public interface PluginProvider<C extends PluginConfiguration> {
     public <T extends IgnitePlugin> T plugin();
 
     /**
+     * Registers extensions.
+     *
      * @param ctx Plugin context.
-     * @param cls Ignite component class.
-     * @return Ignite component or {@code null} if component is not supported.
+     * @param registry Extension registry.
      */
-    @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls);
+    public void initExtensions(PluginContext ctx, ExtensionRegistry registry);
 
     /**
-     * Register extensions.
+     * Creates Ignite component.
+     *
      * @param ctx Plugin context.
-     * @param registry Extension registry.
+     * @param cls Ignite component class.
+     * @return Ignite component or {@code null} if component is not supported.
      */
-    public void initExtensions(PluginContext ctx, ExtensionRegistry registry);
+    @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls);
 
     /**
      * Starts grid component.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 871512c..6e7a706 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -23,13 +23,14 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.swapspace.*;
+
 import org.jetbrains.annotations.*;
 
 import javax.management.*;
@@ -197,7 +198,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
      * Inject ignite instance.
      */
     @IgniteInstanceResource
-    protected void injectResources(Ignite ignite){
+    protected void injectResources(Ignite ignite) {
         this.ignite = ignite;
 
         if (ignite != null) {
@@ -453,19 +454,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
         boolean isSpiConsistent = false;
 
-        String tipStr = " (fix configuration or set " + "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
+        String tipStr = " (fix configuration or set " +
+            "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
 
         if (rmtCls == null) {
             if (!optional && starting)
-                throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + " [name=" + name +
-                    ", loc=" + locCls + ']');
+                throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr +
+                    " [name=" + name + ", loc=" + locCls + ']');
 
             sb.a(format(">>> Remote SPI with the same name is not configured: " + name, locCls));
         }
         else if (!locCls.equals(rmtCls)) {
             if (!optional && starting)
-                throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr + " [name=" + name +
-                    ", loc=" + locCls + ", rmt=" + rmtCls + ']');
+                throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr +
+                    " [name=" + name + ", loc=" + locCls + ", rmt=" + rmtCls + ']');
 
             sb.a(format(">>> Remote SPI with the same name is of different type: " + name, locCls, rmtCls));
         }
@@ -542,9 +544,25 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     }
 
     /**
+     * @param obj Timeout object.
+     * @see IgniteSpiContext#addTimeoutObject(IgniteSpiTimeoutObject)
+     */
+    protected void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.addTimeoutObject(obj);
+    }
+
+    /**
+     * @param obj Timeout object.
+     * @see IgniteSpiContext#removeTimeoutObject(IgniteSpiTimeoutObject)
+     */
+    protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.removeTimeoutObject(obj);
+    }
+
+    /**
      * Temporarily SPI context.
      */
-    private static class GridDummySpiContext implements IgniteSpiContext {
+    private class GridDummySpiContext implements IgniteSpiContext {
         /** */
         private final ClusterNode locNode;
 
@@ -627,27 +645,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
-            @Nullable ClassLoader ldr) {
-            /* No-op. */
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public int partition(String cacheName, Object key) {
             return -1;
         }
 
         /** {@inheritDoc} */
-        @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
             return  locNode == null  ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
         }
@@ -713,12 +715,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
-            @Nullable ClassLoader ldr) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public MessageFormatter messageFormatter() {
             return msgFormatter;
         }
@@ -737,5 +733,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         @Override public boolean tryFailNode(UUID nodeId) {
             return false;
         }
+
+        /** {@inheritDoc} */
+        @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
+            ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
+            ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 6852b6d..f83326c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.spi.swapspace.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -253,30 +252,6 @@ public interface IgniteSpiContext {
     public <K> boolean containsKey(String cacheName, K key);
 
     /**
-     * Writes object to swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param val Value.
-     * @param ldr Class loader (optional).
-     * @throws IgniteException If any exception occurs.
-     */
-    public void writeToSwap(String spaceName, Object key, @Nullable Object val, @Nullable ClassLoader ldr)
-        throws IgniteException;
-
-    /**
-     * Reads object from swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param ldr Class loader (optional).
-     * @return Swapped value.
-     * @throws IgniteException If any exception occurs.
-     */
-    @Nullable public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr)
-        throws IgniteException;
-
-    /**
      * Calculates partition number for given key.
      *
      * @param cacheName Cache name.
@@ -286,16 +261,6 @@ public interface IgniteSpiContext {
     public int partition(String cacheName, Object key);
 
     /**
-     * Removes object from swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param ldr Class loader (optional).
-     * @throws IgniteException If any exception occurs.
-     */
-    public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) throws IgniteException;
-
-    /**
      * Validates that new node can join grid topology, this method is called on coordinator
      * node before new node joins topology.
      *
@@ -322,18 +287,6 @@ public interface IgniteSpiContext {
     public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteException;
 
     /**
-     * Reads swapped cache value from off-heap and swap.
-     *
-     * @param spaceName Off-heap space name.
-     * @param key Key.
-     * @param ldr Class loader for unmarshalling.
-     * @return Value.
-     * @throws IgniteException If any exception occurs.
-     */
-    @Nullable public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
-        @Nullable ClassLoader ldr) throws IgniteException;
-
-    /**
      * Gets message formatter.
      *
      * @return Message formatter.
@@ -357,4 +310,14 @@ public interface IgniteSpiContext {
      * @return If node was failed.
      */
     public boolean tryFailNode(UUID nodeId);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void addTimeoutObject(IgniteSpiTimeoutObject c);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void removeTimeoutObject(IgniteSpiTimeoutObject c);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
new file mode 100644
index 0000000..b3fc28e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.lang.*;
+
+/**
+ * Provides possibility to schedule delayed execution,
+ * see {@link IgniteSpiContext#addTimeoutObject(IgniteSpiTimeoutObject)}.
+ * <p>
+ * Note: all timeout objects are executed in single dedicated thread, so implementation
+ * of {@link #onTimeout()} should not use time consuming and blocking method.
+ */
+public interface IgniteSpiTimeoutObject {
+    /**
+     * @return Unique object ID.
+     */
+    public IgniteUuid id();
+
+    /**
+     * @return End time.
+     */
+    public long endTime();
+
+    /**
+     * Timeout callback.
+     */
+    public void onTimeout();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
index 460cff3..832d872 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
@@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException {
+    @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index fd17791..359de1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
 
-    /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */
-    public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100;
-
-    /** Default value for connection buffer size (value is <tt>0</tt>). */
-    public static final int DFLT_CONN_BUF_SIZE = 0;
-
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
@@ -267,7 +261,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                                 log.debug("Session was closed but there are unacknowledged messages, " +
                                                     "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
 
-                                            recoveryWorker.addReconnectRequest(recoveryData);
+                                            commWorker.addReconnectRequest(recoveryData);
                                         }
                                     }
                                     else
@@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Idle connection timeout. */
     private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
 
-    /** Connection buffer flush frequency. */
-    private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ;
-
-    /** Connection buffer size. */
-    @SuppressWarnings("RedundantFieldInitialization")
-    private int connBufSize = DFLT_CONN_BUF_SIZE;
-
     /** Connect timeout. */
     private long connTimeout = DFLT_CONN_TIMEOUT;
 
@@ -647,17 +634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
-    /** Idle client worker. */
-    private IdleClientWorker idleClientWorker;
-
-    /** Flush client worker. */
-    private ClientFlushWorker clientFlushWorker;
-
-    /** Socket timeout worker. */
-    private SocketTimeoutWorker sockTimeoutWorker;
-
-    /** Recovery worker. */
-    private RecoveryWorker recoveryWorker;
+    /** Recovery and idle clients handler. */
+    private CommunicationWorker commWorker;
 
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -882,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
-     * <p>
-     * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}.
      *
      * @param connBufSize Connection buffer size.
      * @see #setConnectionBufferFlushFrequency(long)
      */
     @IgniteSpiConfiguration(optional = true)
     public void setConnectionBufferSize(int connBufSize) {
-        this.connBufSize = connBufSize;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public int getConnectionBufferSize() {
-        return connBufSize;
+        return 0;
     }
 
     /** {@inheritDoc} */
     @IgniteSpiConfiguration(optional = true)
     @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
-        this.connBufFlushFreq = connBufFlushFreq;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public long getConnectionBufferFlushFrequency() {
-        return connBufFlushFreq;
+        return 0;
     }
 
     /**
@@ -1174,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
         assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
-        assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0");
-        assertParameter(connBufSize >= 0, "connBufSize >= 0");
         assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
@@ -1245,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("idleConnTimeout", idleConnTimeout));
             log.debug(configInfo("directBuf", directBuf));
             log.debug(configInfo("directSendBuf", directSndBuf));
-            log.debug(configInfo("connBufSize", connBufSize));
-            log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
             log.debug(configInfo("selectorsCnt", selectorsCnt));
             log.debug(configInfo("tcpNoDelay", tcpNoDelay));
             log.debug(configInfo("sockSndBuf", sockSndBuf));
@@ -1261,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
         }
 
-        if (connBufSize > 8192)
-            U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " +
-                "intentionally) [specified=" + connBufSize + ", recommended=8192]",
-                "Specified communication IO buffer size is larger than recommended (ignore if done intentionally).");
-
         if (!tcpNoDelay)
             U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
                 "since may produce significant delays with some scenarios.");
@@ -1274,23 +1241,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         nioSrvr.start();
 
-        idleClientWorker = new IdleClientWorker();
+        commWorker = new CommunicationWorker();
 
-        idleClientWorker.start();
-
-        recoveryWorker = new RecoveryWorker();
-
-        recoveryWorker.start();
-
-        if (connBufSize > 0) {
-            clientFlushWorker = new ClientFlushWorker();
-
-            clientFlushWorker.start();
-        }
-
-        sockTimeoutWorker = new SocketTimeoutWorker();
-
-        sockTimeoutWorker.start();
+        commWorker.start();
 
         // Ack start.
         if (log.isDebugEnabled())
@@ -1445,15 +1398,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
-        U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
-        U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         // Force closing on stop (safety).
         for (GridCommunicationClient client : clients.values())
@@ -1461,7 +1408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Clear resources.
         nioSrvr = null;
-        idleClientWorker = null;
+        commWorker = null;
 
         boundTcpPort = -1;
 
@@ -1899,7 +1846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
@@ -2005,7 +1952,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Ignoring whatever happened after timeout - reporting only timeout event.
             if (!cancelled)
@@ -2041,15 +1988,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
-        U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
-        U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         for (GridCommunicationClient client : clients.values())
             client.forceClose();
@@ -2156,80 +2097,95 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private class IdleClientWorker extends IgniteSpiThread {
+    private class CommunicationWorker extends IgniteSpiThread {
+        /** */
+        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
+
         /**
          *
          */
-        IdleClientWorker() {
-            super(gridName, "nio-idle-client-collector", log);
+        private CommunicationWorker() {
+            super(gridName, "tcp-comm-worker", log);
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
         @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Tcp communication worker has been started.");
+
             while (!isInterrupted()) {
-                cleanupRecovery();
+                GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
 
-                for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
-                    UUID nodeId = e.getKey();
+                if (recoveryDesc != null)
+                    processRecovery(recoveryDesc);
+                else
+                    processIdle();
+            }
+        }
 
-                    GridCommunicationClient client = e.getValue();
+        /**
+         *
+         */
+        private void processIdle() {
+            cleanupRecovery();
 
-                    ClusterNode node = getSpiContext().node(nodeId);
+            for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+                UUID nodeId = e.getKey();
 
-                    if (node == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Forcing close of non-existent node connection: " + nodeId);
+                GridCommunicationClient client = e.getValue();
 
-                        client.forceClose();
+                ClusterNode node = getSpiContext().node(nodeId);
 
-                        clients.remove(nodeId, client);
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing close of non-existent node connection: " + nodeId);
 
-                        continue;
-                    }
+                    client.forceClose();
 
-                    GridNioRecoveryDescriptor recovery = null;
+                    clients.remove(nodeId, client);
 
-                    if (client instanceof GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+                    continue;
+                }
 
-                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
-                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+                GridNioRecoveryDescriptor recovery = null;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
-                                    ", rcvCnt=" + msg.received() + ']');
+                if (client instanceof GridTcpNioCommunicationClient) {
+                    recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
 
-                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+                    if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
 
-                            recovery.lastAcknowledged(msg.received());
+                        if (log.isDebugEnabled())
+                            log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                ", rcvCnt=" + msg.received() + ']');
 
-                            continue;
-                        }
-                    }
+                        nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
 
-                    long idleTime = client.getIdleTime();
+                        recovery.lastAcknowledged(msg.received());
 
-                    if (idleTime >= idleConnTimeout) {
-                        if (recovery != null &&
-                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                            !recovery.messagesFutures().isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
-                                    "will wait: " + nodeId);
+                        continue;
+                    }
+                }
 
-                            continue;
-                        }
+                long idleTime = client.getIdleTime();
 
+                if (idleTime >= idleConnTimeout) {
+                    if (recovery != null &&
+                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                        !recovery.messagesFutures().isEmpty()) {
                         if (log.isDebugEnabled())
-                            log.debug("Closing idle node connection: " + nodeId);
+                            log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                "will wait: " + nodeId);
 
-                        if (client.close() || client.closed())
-                            clients.remove(nodeId, client);
+                        continue;
                     }
-                }
 
-                Thread.sleep(idleConnTimeout);
+                    if (log.isDebugEnabled())
+                        log.debug("Closing idle node connection: " + nodeId);
+
+                    if (client.close() || client.closed())
+                        clients.remove(nodeId, client);
+                }
             }
         }
 
@@ -2264,212 +2220,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 }
             }
         }
-    }
-
-    /**
-     *
-     */
-    private class ClientFlushWorker extends IgniteSpiThread {
-        /**
-         *
-         */
-        ClientFlushWorker() {
-            super(gridName, "nio-client-flusher", log);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                long connBufFlushFreq0 = connBufFlushFreq;
-
-                for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
-                    GridCommunicationClient client = entry.getValue();
-
-                    if (client.reserve()) {
-                        boolean err = true;
-
-                        try {
-                            client.flushIfNeeded(connBufFlushFreq0);
-
-                            err = false;
-                        }
-                        catch (IOException e) {
-                            if (getSpiContext().pingNode(entry.getKey()))
-                                U.error(log, "Failed to flush client: " + client, e);
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to flush client (node left): " + client);
-
-                                onException("Failed to flush client (node left): " + client, e);
-                            }
-                        }
-                        finally {
-                            if (err)
-                                client.forceClose();
-                            else
-                                client.release();
-                        }
-                    }
-                }
-
-                Thread.sleep(connBufFlushFreq0);
-            }
-        }
-    }
-
-    /**
-     * Handles sockets timeouts.
-     */
-    private class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() {
-                @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) {
-                    long time1 = o1.endTime();
-                    long time2 = o2.endTime();
-
-                    long id1 = o1.id();
-                    long id2 = o2.id();
-
-                    return time1 < time2 ? -1 : time1 > time2 ? 1 :
-                        id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
-
-        /**
-         *
-         */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-comm-sock-timeout-worker", log);
-        }
-
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
-            timeoutObjs.add(timeoutObj);
-
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
-            }
-        }
 
         /**
-         * @param timeoutObj Timeout object to remove.
+         * @param recoveryDesc Recovery descriptor.
          */
-        public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
-
-            timeoutObjs.remove(timeoutObj);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
+        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
+            ClusterNode node = recoveryDesc.node();
 
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
-
-                for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    HandshakeTimeoutObject timeoutObj = iter.next();
-
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
+            if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
-                        timeoutObj.onTimeout();
-                    }
-                    else
-                        break;
-                }
-
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        HandshakeTimeoutObject first = timeoutObjs.firstx();
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
+                GridCommunicationClient client = reserveClient(node);
 
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
-                    }
-                }
+                client.release();
             }
-        }
-    }
-
-    /**
-     *
-     */
-    private class RecoveryWorker extends IgniteSpiThread {
-        /** */
-        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
-
-        /**
-         *
-         */
-        private RecoveryWorker() {
-            super(gridName, "tcp-comm-recovery-worker", log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Recovery worker has been started.");
-
-            while (!isInterrupted()) {
-                GridNioRecoveryDescriptor recoveryDesc = q.take();
-
-                assert recoveryDesc != null;
-
-                ClusterNode node = recoveryDesc.node();
-
-                if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    continue;
-
-                try {
+            catch (IgniteCheckedException | IgniteException e) {
+                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
                     if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
-
-                    GridCommunicationClient client = reserveClient(node);
+                        log.debug("Recovery reconnect failed, will retry " +
+                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
 
-                    client.release();
+                    addReconnectRequest(recoveryDesc);
                 }
-                catch (IgniteCheckedException e) {
-                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, will retry " +
-                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
-
-                        addReconnectRequest(recoveryDesc);
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, " +
-                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
-
-                        onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                            e);
-                    }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, " +
+                            "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
 
+                    onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+                        e);
                 }
             }
         }
@@ -2497,12 +2280,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private static class HandshakeTimeoutObject<T> {
-        /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
+    private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
         /** */
-        private final long id = idGen.incrementAndGet();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private final T obj;
@@ -2533,34 +2313,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return done.compareAndSet(false, true);
         }
 
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
             if (done.compareAndSet(false, true)) {
                 // Close socket - timeout occurred.
                 if (obj instanceof GridCommunicationClient)
                     ((GridCommunicationClient)obj).forceClose();
                 else
                     U.closeQuiet((AbstractInterruptibleChannel)obj);
-
-                return true;
             }
-
-            return false;
         }
 
-        /**
-         * @return End time.
-         */
-        long endTime() {
+        /** {@inheritDoc} */
+        @Override public long endTime() {
             return endTime;
         }
 
-        /**
-         * @return ID.
-         */
-        long id() {
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
             return id;
         }