You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/06/09 07:20:43 UTC

[01/29] incubator-ignite git commit: # IGNITE-883 Set named thread factory to pool. (cherry picked from commit 0ac2ff2)

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-992 0eee664c0 -> 8740b6e76


# IGNITE-883 Set named thread factory to pool.
(cherry picked from commit 0ac2ff2)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dce67896
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dce67896
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dce67896

Branch: refs/heads/ignite-992
Commit: dce67896a81c6a2148e53284aef49e5c1aa4d3b7
Parents: 97d0b04
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Jun 1 18:01:34 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Jun 1 19:48:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/service/GridServiceProcessor.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dce67896/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 2e31b69..64eb1c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -70,7 +70,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>();
 
     /** Deployment executor service. */
-    private final ExecutorService depExe = Executors.newSingleThreadExecutor();
+    private final ExecutorService depExe;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -98,6 +98,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     public GridServiceProcessor(GridKernalContext ctx) {
         super(ctx);
+
+        depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(), "srvc-deploy"));
     }
 
     /** {@inheritDoc} */


[11/29] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-883_1' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'origin/ignite-883_1' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a6761529
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a6761529
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a6761529

Branch: refs/heads/ignite-992
Commit: a6761529504b9df26703558f9452d53eb9e6dd0f
Parents: ad2f4ef 3417b3d
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 09:57:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 09:57:06 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  13 +-
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../managers/communication/GridIoManager.java   | 117 ++++----
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  19 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../processors/hadoop/HadoopTaskContext.java    |  14 +-
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   2 +-
 .../internal/visor/query/VisorQueryJob.java     |   2 +-
 ...niteDynamicCacheWithConfigStartSelfTest.java |  97 +++++++
 .../igfs/IgfsClientCacheSelfTest.java           |   9 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   1 +
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  14 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  70 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   2 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
 .../hadoop/SecondaryFileSystemProvider.java     |   4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  20 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  31 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  26 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  48 +++-
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../hadoop/HadoopAbstractSelfTest.java          |  14 +-
 .../hadoop/HadoopCommandLineTest.java           |  14 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 176 +++++++++++-
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../collections/HadoopAbstractMapTest.java      |  12 +
 ...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 .../commands/cache/VisorCacheScanCommand.scala  |   2 +-
 38 files changed, 1122 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6761529/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------


[23/29] incubator-ignite git commit: # ignite-sprint-5 added test for ignite-999

Posted by ak...@apache.org.
# ignite-sprint-5 added test for ignite-999


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/837462f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/837462f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/837462f5

Branch: refs/heads/ignite-992
Commit: 837462f5dade1930a8faaffb466c7bf05f9e06f9
Parents: a5b5ec7
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 14:55:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 14:55:20 2015 +0300

----------------------------------------------------------------------
 .../DataStreamerMultinodeCreateCacheTest.java   | 97 ++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |  1 +
 2 files changed, 98 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/837462f5/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
new file mode 100644
index 0000000..2d19d6f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(5000);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(5000);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateCacheAndStream() throws Exception {
+        final int THREADS = 5;
+
+        startGrids(THREADS);
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int threadIdx = idx.getAndIncrement();
+
+                long stopTime = System.currentTimeMillis() + 60_000;
+
+                Ignite ignite = grid(threadIdx);
+
+                int iter = 0;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    String cacheName = "cache-" + threadIdx + "-" + iter;
+
+                    try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cacheName)) {
+                        try (IgniteDataStreamer<Object, Object> stmr = ignite.dataStreamer(cacheName)) {
+                            ((DataStreamerImpl<Object, Object>)stmr).maxRemapCount(0);
+
+                            for (int i = 0; i < 1000; i++)
+                                stmr.addData(i, i);
+                        }
+                    }
+
+                    iter++;
+                }
+
+                return null;
+            }
+        }, THREADS, "create-cache");
+
+        fut.get(2 * 60_000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/837462f5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 9634e9a..7e4409d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -111,6 +111,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
         suite.addTestSuite(DataStreamProcessorSelfTest.class);
         suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
+        suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);
         suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class);
         suite.addTestSuite(GridCacheClearAllSelfTest.class);


[13/29] incubator-ignite git commit: # ignite-883

Posted by ak...@apache.org.
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb827a77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb827a77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb827a77

Branch: refs/heads/ignite-992
Commit: fb827a7784614343ae639ea8b856d2f9f88d46db
Parents: db57652
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 11:41:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 15:10:00 2015 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java |  31 +-
 .../timeout/GridSpiTimeoutObject.java           |  14 +
 .../util/nio/GridCommunicationClient.java       |   6 -
 .../util/nio/GridTcpCommunicationClient.java    | 554 -------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   8 -
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   4 +
 .../communication/tcp/TcpCommunicationSpi.java  |  97 +---
 .../tcp/TcpCommunicationSpiMBean.java           |   2 -
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   4 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   2 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   2 -
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  13 +-
 14 files changed, 61 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 2138639..aa3bfe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -101,7 +101,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
 
     /** */
-    private UUID qryId;
+    private volatile UUID qryId;
 
     /**
      * @param ctx Context.
@@ -144,11 +144,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             seqView = atomicsCache;
 
             dsCacheCtx = atomicsCache.context();
+        }
+    }
 
-            qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
-                new DataStructuresEntryFilter(),
-                dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
-                false);
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void startQuery() throws IgniteCheckedException {
+        if (qryId == null) {
+            synchronized (this) {
+                if (qryId == null) {
+                    qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+                        new DataStructuresEntryFilter(),
+                        dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                        false);
+                }
+            }
         }
     }
 
@@ -178,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
             @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -304,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
             @Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
                 final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -507,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
             @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -608,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
             @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException {
                 GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
@@ -916,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
             @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
index 82267a2..a0fd9b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -53,6 +53,20 @@ public class GridSpiTimeoutObject implements GridTimeoutObject {
     }
 
     /** {@inheritDoc} */
+    @Override public int hashCode() {
+        assert false;
+
+        return super.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        assert false;
+
+        return super.equals(obj);
+    }
+
+    /** {@inheritDoc} */
     @Override public final String toString() {
         return S.toString(GridSpiTimeoutObject.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 2f7fd88..693a5a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -100,12 +100,6 @@ public interface GridCommunicationClient {
     public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    public void flushIfNeeded(long timeout) throws IOException;
-
-    /**
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
deleted file mode 100644
index 72c20f8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Grid client for NIO server.
- */
-public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
-    /** Socket. */
-    private final Socket sock;
-
-    /** Output stream. */
-    private final UnsafeBufferedOutputStream out;
-
-    /** Minimum buffered message count. */
-    private final int minBufferedMsgCnt;
-
-    /** Communication buffer size ratio. */
-    private final double bufSizeRatio;
-
-    /** */
-    private final ByteBuffer writeBuf;
-
-    /** */
-    private final MessageFormatter formatter;
-
-    /**
-     * @param metricsLsnr Metrics listener.
-     * @param addr Address.
-     * @param locHost Local address.
-     * @param connTimeout Connect timeout.
-     * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
-     * @param sockRcvBuf Socket receive buffer.
-     * @param sockSndBuf Socket send buffer.
-     * @param bufSize Buffer size (or {@code 0} to disable buffer).
-     * @param minBufferedMsgCnt Minimum buffered message count.
-     * @param bufSizeRatio Communication buffer size ratio.
-     * @param formatter Message formatter.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridTcpCommunicationClient(
-        GridNioMetricsListener metricsLsnr,
-        InetSocketAddress addr,
-        InetAddress locHost,
-        long connTimeout,
-        boolean tcpNoDelay,
-        int sockRcvBuf,
-        int sockSndBuf,
-        int bufSize,
-        int minBufferedMsgCnt,
-        double bufSizeRatio,
-        MessageFormatter formatter
-    ) throws IgniteCheckedException {
-        super(metricsLsnr);
-
-        assert metricsLsnr != null;
-        assert addr != null;
-        assert locHost != null;
-        assert connTimeout >= 0;
-        assert bufSize >= 0;
-
-        A.ensure(minBufferedMsgCnt >= 0,
-            "Value of minBufferedMessageCount property cannot be less than zero.");
-        A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
-            "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
-
-        this.minBufferedMsgCnt = minBufferedMsgCnt;
-        this.bufSizeRatio = bufSizeRatio;
-        this.formatter = formatter;
-
-        writeBuf = ByteBuffer.allocate(8 << 10);
-
-        writeBuf.order(ByteOrder.nativeOrder());
-
-        sock = new Socket();
-
-        boolean success = false;
-
-        try {
-            sock.bind(new InetSocketAddress(locHost, 0));
-
-            sock.setTcpNoDelay(tcpNoDelay);
-
-            if (sockRcvBuf > 0)
-                sock.setReceiveBufferSize(sockRcvBuf);
-
-            if (sockSndBuf > 0)
-                sock.setSendBufferSize(sockSndBuf);
-
-            sock.connect(addr, (int)connTimeout);
-
-            out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
-
-            success = true;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to connect to remote host " +
-                "[addr=" + addr + ", localHost=" + locHost + ']', e);
-        }
-        finally {
-            if (!success)
-                U.closeQuiet(sock);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
-        try {
-            handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " +
-                sock.getRemoteSocketAddress(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        boolean res = super.close();
-
-        if (res) {
-            U.closeQuiet(out);
-            U.closeQuiet(sock);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        super.forceClose();
-
-        try {
-            out.flush();
-        }
-        catch (IOException ignored) {
-            // No-op.
-        }
-
-        // Do not call (directly or indirectly) out.close() here
-        // since it may cause a deadlock.
-        out.forceClose();
-
-        U.closeQuiet(sock);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        try {
-            out.write(data, 0, len);
-
-            metricsLsnr.onBytesSent(len);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
-        throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        assert writeBuf.hasArray();
-
-        try {
-            int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer());
-
-            metricsLsnr.onBytesSent(cnt);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-
-        return false;
-    }
-
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        assert timeout > 0;
-
-        out.flushOnTimeout(timeout);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTcpCommunicationClient.class, this, super.toString());
-    }
-
-    /**
-     *
-     */
-    private class UnsafeBufferedOutputStream extends FilterOutputStream {
-        /** The internal buffer where data is stored. */
-        private final byte buf[];
-
-        /** Current size. */
-        private int size;
-
-        /** Count. */
-        private int cnt;
-
-        /** Message count. */
-        private int msgCnt;
-
-        /** Total messages size. */
-        private int totalCnt;
-
-        /** Lock. */
-        private final ReentrantLock lock = new ReentrantLock();
-
-        /** Last flushed timestamp. */
-        private volatile long lastFlushed = U.currentTimeMillis();
-
-        /** Cached flush timeout. */
-        private volatile long flushTimeout;
-
-        /** Buffer adjusted timestamp. */
-        private long lastAdjusted = U.currentTimeMillis();
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream.
-         *
-         * @param out The underlying output stream.
-         */
-        UnsafeBufferedOutputStream(OutputStream out) {
-            this(out, 8192);
-        }
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream with the specified buffer
-         * size.
-         *
-         * @param out The underlying output stream.
-         * @param size The buffer size.
-         */
-        UnsafeBufferedOutputStream(OutputStream out, int size) {
-            super(out);
-
-            assert size >= 0;
-
-            this.size = size;
-            buf = size > 0 ? new byte[size] : null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(int b) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(byte[] b, int off, int len) throws IOException {
-            assert b != null;
-            assert off == 0;
-
-            // No buffering.
-            if (buf == null) {
-                lock.lock();
-
-                try {
-                    out.write(b, 0, len);
-                }
-                finally {
-                    lock.unlock();
-                }
-
-                return;
-            }
-
-            // Buffering is enabled.
-            lock.lock();
-
-            try {
-                msgCnt++;
-                totalCnt += len;
-
-                if (len >= size) {
-                    flushLocked();
-
-                    out.write(b, 0, len);
-
-                    lastFlushed = U.currentTimeMillis();
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                if (cnt + len > size) {
-                    flushLocked();
-
-                    messageToBuffer0(b, off, len, buf, 0);
-
-                    cnt = len;
-
-                    assert cnt < size;
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                messageToBuffer0(b, 0, len, buf, cnt);
-
-                cnt += len;
-
-                if (cnt == size)
-                    flushLocked();
-                else
-                    flushIfNeeded();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushIfNeeded() throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                flushOnTimeoutLocked(flushTimeout0);
-        }
-
-        /**
-         *
-         */
-        private void adjustBufferIfNeeded() {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                adjustBufferLocked(flushTimeout0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void flush() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        public void flushOnTimeout(long timeout) throws IOException {
-            assert buf != null;
-            assert timeout > 0;
-
-            // Overwrite cached value.
-            flushTimeout = timeout;
-
-            if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
-                return;
-
-            try {
-                flushOnTimeoutLocked(timeout);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        private void flushOnTimeoutLocked(long timeout) throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            // Double check.
-            if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
-                return;
-
-            flushLocked();
-
-            adjustBufferLocked(timeout);
-        }
-
-        /**
-         * @param timeout Timeout.
-         */
-        private void adjustBufferLocked(long timeout) {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            long time = U.currentTimeMillis();
-
-            if (lastAdjusted + timeout < time) {
-                if (msgCnt <= minBufferedMsgCnt)
-                    size = 0;
-                else {
-                    size = (int)(totalCnt * bufSizeRatio);
-
-                    if (size > buf.length)
-                        size = buf.length;
-                }
-
-                msgCnt = 0;
-                totalCnt = 0;
-
-                lastAdjusted = time;
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushLocked() throws IOException {
-            assert lock.isHeldByCurrentThread();
-
-            if (buf != null && cnt > 0) {
-                out.write(buf, 0, cnt);
-
-                cnt = 0;
-            }
-
-            out.flush();
-
-            lastFlushed = U.currentTimeMillis();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                try {
-                    out.close();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        /**
-         * Forcibly closes underlying stream ignoring any possible exception.
-         */
-        public void forceClose() {
-            try {
-                out.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-        }
-
-        /**
-         * @param b Buffer to copy from.
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) {
-            assert b.length == len;
-            assert off == 0;
-            assert resBuf.length >= resOff + len + 4;
-
-            U.intToBytes(len, resBuf, resOff);
-
-            U.arrayCopy(b, off, resBuf, resOff + 4, len);
-        }
-
-        /**
-         * @param b Buffer to copy from (length included).
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) {
-            assert off == 0;
-            assert resBuf.length >= resOff + len;
-
-            U.arrayCopy(b, off, resBuf, resOff, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            lock.lock();
-
-            try {
-                return S.toString(UnsafeBufferedOutputStream.class, this);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 788a8e6..abad875 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
         return false;
     }
 
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public boolean async() {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c9c633f..d095491 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -730,11 +730,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
         /** {@inheritDoc} */
         @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
             ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
         @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
             ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b324ab2..359de1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
 
-    /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */
-    public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100;
-
-    /** Default value for connection buffer size (value is <tt>0</tt>). */
-    public static final int DFLT_CONN_BUF_SIZE = 0;
-
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
@@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Idle connection timeout. */
     private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
 
-    /** Connection buffer flush frequency. */
-    private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ;
-
-    /** Connection buffer size. */
-    @SuppressWarnings("RedundantFieldInitialization")
-    private int connBufSize = DFLT_CONN_BUF_SIZE;
-
     /** Connect timeout. */
     private long connTimeout = DFLT_CONN_TIMEOUT;
 
@@ -647,9 +634,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
-    /** Flush client worker. */
-    private ClientFlushWorker clientFlushWorker;
-
     /** Recovery and idle clients handler. */
     private CommunicationWorker commWorker;
 
@@ -876,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
-     * <p>
-     * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}.
      *
      * @param connBufSize Connection buffer size.
      * @see #setConnectionBufferFlushFrequency(long)
      */
     @IgniteSpiConfiguration(optional = true)
     public void setConnectionBufferSize(int connBufSize) {
-        this.connBufSize = connBufSize;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public int getConnectionBufferSize() {
-        return connBufSize;
+        return 0;
     }
 
     /** {@inheritDoc} */
     @IgniteSpiConfiguration(optional = true)
     @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
-        this.connBufFlushFreq = connBufFlushFreq;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public long getConnectionBufferFlushFrequency() {
-        return connBufFlushFreq;
+        return 0;
     }
 
     /**
@@ -1168,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
         assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
-        assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0");
-        assertParameter(connBufSize >= 0, "connBufSize >= 0");
         assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
@@ -1239,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("idleConnTimeout", idleConnTimeout));
             log.debug(configInfo("directBuf", directBuf));
             log.debug(configInfo("directSendBuf", directSndBuf));
-            log.debug(configInfo("connBufSize", connBufSize));
-            log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
             log.debug(configInfo("selectorsCnt", selectorsCnt));
             log.debug(configInfo("tcpNoDelay", tcpNoDelay));
             log.debug(configInfo("sockSndBuf", sockSndBuf));
@@ -1255,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
         }
 
-        if (connBufSize > 8192)
-            U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " +
-                "intentionally) [specified=" + connBufSize + ", recommended=8192]",
-                "Specified communication IO buffer size is larger than recommended (ignore if done intentionally).");
-
         if (!tcpNoDelay)
             U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
                 "since may produce significant delays with some scenarios.");
@@ -1272,12 +1245,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         commWorker.start();
 
-        if (connBufSize > 0) {
-            clientFlushWorker = new ClientFlushWorker();
-
-            clientFlushWorker.start();
-        }
-
         // Ack start.
         if (log.isDebugEnabled())
             log.debug(startInfo());
@@ -1431,10 +1398,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(clientFlushWorker);
         U.interrupt(commWorker);
 
-        U.join(clientFlushWorker, log);
         U.join(commWorker, log);
 
         // Force closing on stop (safety).
@@ -2023,10 +1988,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(clientFlushWorker);
         U.interrupt(commWorker);
 
-        U.join(clientFlushWorker, log);
         U.join(commWorker, log);
 
         for (GridCommunicationClient client : clients.values())
@@ -2134,58 +2097,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private class ClientFlushWorker extends IgniteSpiThread {
-        /**
-         *
-         */
-        ClientFlushWorker() {
-            super(gridName, "nio-client-flusher", log);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                long connBufFlushFreq0 = connBufFlushFreq;
-
-                for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
-                    GridCommunicationClient client = entry.getValue();
-
-                    if (client.reserve()) {
-                        boolean err = true;
-
-                        try {
-                            client.flushIfNeeded(connBufFlushFreq0);
-
-                            err = false;
-                        }
-                        catch (IOException e) {
-                            if (getSpiContext().pingNode(entry.getKey()))
-                                U.error(log, "Failed to flush client: " + client, e);
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to flush client (node left): " + client);
-
-                                onException("Failed to flush client (node left): " + client, e);
-                            }
-                        }
-                        finally {
-                            if (err)
-                                client.forceClose();
-                            else
-                                client.release();
-                        }
-                    }
-                }
-
-                Thread.sleep(connBufFlushFreq0);
-            }
-        }
-    }
-
-    /**
-     *
-     */
     private class CommunicationWorker extends IgniteSpiThread {
         /** */
         private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..6f5a738 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
      * This frequency defines how often system will advice to flush
      * connection buffer.
      * <p>
-     * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}.
-     * <p>
      * This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}.
      *
      * @param connBufFlushFreq Flush frequency.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index e3baeb0..bdf9929 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -1286,7 +1286,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test client to use instead of {@link GridTcpCommunicationClient}
+     * Test client to use instead of {@link GridTcpNioCommunicationClient}
      */
     private static class TestClient implements AutoCloseable {
         /** Socket implementation to use. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index ea51aff..8d27485 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -64,7 +64,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
 
         // Test idle clients remove.
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             assertEquals(2, clients.size());
 
@@ -77,7 +77,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.afterTest();
 
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
                 info("Check failed for SPI [grid=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c038ee7..2d175f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -256,7 +256,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
                     for (CommunicationSpi spi : spis) {
-                        ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+                        ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
                         assertEquals(1, clients.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 34fa610..c4a0916 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -32,8 +32,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPort", 65636);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPortRange", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "idleConnectionTimeout", 0);
-        checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferSize", -1);
-        checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferFlushFrequency", 0);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketReceiveBuffer", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketSendBuffer", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index e7ae957..3916f02 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -501,7 +501,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
         }
 
         for (CommunicationSpi spi : spis.values()) {
-            final ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+            final ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             assert GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 9c6fbb4..61bb944 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.config.*;
 import org.apache.ignite.testframework.junits.*;
 import org.apache.ignite.testframework.junits.spi.*;
@@ -267,9 +268,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
 
             Collection<UUID> nodeIds = new HashSet<>();
 
-            for (IgniteTestResources rsrc : spiRsrcs) {
+            for (IgniteTestResources rsrc : spiRsrcs)
                 nodeIds.add(rsrc.getNodeId());
-            }
 
             for (ClusterNode node : spi.getRemoteNodes()) {
                 if (nodeIds.contains(node.id())) {
@@ -390,6 +390,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
                     }
                 });
 
+                GridSpiTestContext ctx = initSpiContext();
+
+                GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "spiCtx", ctx);
+
                 spi.spiStart(getTestGridName() + i);
 
                 spis.add(spi);
@@ -397,7 +401,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
                 spiRsrcs.add(rsrcMgr);
 
                 // Force to use test context instead of default dummy context.
-                spi.onContextInitialized(initSpiContext());
+                spi.onContextInitialized(ctx);
             }
         }
         catch (Throwable e) {
@@ -438,9 +442,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
             spi.spiStop();
         }
 
-        for (IgniteTestResources rscrs : spiRsrcs) {
+        for (IgniteTestResources rscrs : spiRsrcs)
             rscrs.stopThreads();
-        }
 
         // Clear.
         spis.clear();



[17/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0a4e7dd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0a4e7dd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0a4e7dd4

Branch: refs/heads/ignite-992
Commit: 0a4e7dd4c8321bb163cd8568494c97a1a4aef437
Parents: 50b0b49 59db4a5
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 10:10:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 10:10:59 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |  11 +
 .../apache/ignite/internal/IgniteKernal.java    |   2 +
 .../ignite/internal/IgniteNodeAttributes.java   |   5 +-
 .../cache/DynamicCacheDescriptor.java           |  17 +
 .../processors/cache/GridCacheAdapter.java      |  17 +-
 .../processors/cache/GridCacheContext.java      |  13 +
 .../processors/cache/GridCacheProcessor.java    |  37 ++-
 .../processors/cache/GridCacheProxyImpl.java    |  14 +-
 .../processors/cache/IgniteCacheProxy.java      |  23 ++
 .../processors/cache/IgniteInternalCache.java   |  11 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +
 .../transactions/IgniteTxLocalAdapter.java      |  28 ++
 .../processors/igfs/IgfsMetaManager.java        |   2 +-
 .../IgniteTxRollbackCheckedException.java       |   9 +
 ...acheReadOnlyTransactionalClientSelfTest.java | 327 +++++++++++++++++++
 .../cache/GridCacheAbstractFullApiSelfTest.java |  83 +++++
 .../GridCacheExAbstractFullApiSelfTest.java     | 103 ------
 .../GridCacheExColocatedFullApiSelfTest.java    |  33 --
 .../near/GridCacheExNearFullApiSelfTest.java    |  39 ---
 .../GridCacheExReplicatedFullApiSelfTest.java   |  33 --
 .../local/GridCacheExLocalFullApiSelfTest.java  |  30 --
 .../loadtests/hashmap/GridCacheTestContext.java |   1 +
 .../IgniteCacheFullApiSelfTestSuite.java        |   6 -
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 24 files changed, 591 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a4e7dd4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a4e7dd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a4e7dd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------


[06/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4184f363
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4184f363
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4184f363

Branch: refs/heads/ignite-992
Commit: 4184f363c340b35008aff553f426d872797d7ee9
Parents: 873e01b 97d0bc1
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jun 3 18:05:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 18:05:13 2015 +0300

----------------------------------------------------------------------
 assembly/dependencies-visor-console.xml         |    3 +
 .../ignite/cache/eviction/EvictableEntry.java   |    7 +
 .../ignite/cache/eviction/EvictionPolicy.java   |    2 +
 .../cache/eviction/fifo/FifoEvictionPolicy.java |  117 +-
 .../eviction/fifo/FifoEvictionPolicyMBean.java  |   22 +
 .../cache/eviction/lru/LruEvictionPolicy.java   |  135 ++-
 .../eviction/lru/LruEvictionPolicyMBean.java    |   38 +
 .../eviction/random/RandomEvictionPolicy.java   |   10 +-
 .../eviction/sorted/SortedEvictionPolicy.java   |  141 ++-
 .../sorted/SortedEvictionPolicyMBean.java       |   22 +
 .../apache/ignite/cache/query/QueryMetrics.java |    6 +-
 .../discovery/GridDiscoveryManager.java         |    4 +-
 .../cache/CacheEvictableEntryImpl.java          |   31 +
 .../processors/cache/CacheMetricsImpl.java      |    4 +-
 .../processors/cache/GridCacheMapEntry.java     |   64 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |   11 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   13 +-
 .../GridDhtPartitionsExchangeFuture.java        |   46 +-
 .../dht/preloader/GridDhtPreloader.java         |    9 +-
 .../local/atomic/GridLocalAtomicCache.java      |   25 +-
 .../cache/query/GridCacheQueryAdapter.java      |   12 +-
 .../processors/query/GridQueryProcessor.java    |  305 +++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    2 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   27 +
 .../cache/GridCacheAbstractMetricsSelfTest.java |   48 +-
 .../GridCacheConcurrentTxMultiNodeTest.java     |    8 +-
 ...idCacheConfigurationConsistencySelfTest.java |   14 +-
 .../cache/GridCacheMemoryModeSelfTest.java      |   23 +-
 .../processors/cache/GridCacheOffHeapTest.java  |    5 +-
 .../cache/GridCacheReloadSelfTest.java          |    6 +-
 .../cache/IgniteCachePeekModesAbstractTest.java |    5 +-
 ...GridCacheDhtEvictionNearReadersSelfTest.java |   11 +-
 .../dht/GridCacheDhtEvictionSelfTest.java       |   11 +-
 .../dht/IgniteCacheMultiTxLockSelfTest.java     |    6 +-
 .../GridCachePartitionedEvictionSelfTest.java   |   11 +-
 .../GridCachePartitionedFullApiSelfTest.java    |   32 +
 ...ePartitionedMultiThreadedPutGetSelfTest.java |    6 +-
 ...edOffHeapTieredMultiNodeFullApiSelfTest.java |    2 +-
 .../cache/eviction/EvictionAbstractTest.java    | 1056 ++++++++++++++++++
 .../GridCacheBatchEvictUnswapSelfTest.java      |    5 +-
 ...heConcurrentEvictionConsistencySelfTest.java |   82 +-
 .../GridCacheConcurrentEvictionsSelfTest.java   |   29 +-
 .../GridCacheDistributedEvictionsSelfTest.java  |    5 +-
 .../GridCacheEmptyEntriesAbstractSelfTest.java  |   11 +-
 .../eviction/GridCacheEvictionAbstractTest.java |  484 --------
 .../GridCacheEvictionTouchSelfTest.java         |   22 +-
 .../cache/eviction/GridCacheMockEntry.java      |    5 +
 .../fifo/FifoEvictionPolicySelfTest.java        |  262 +++++
 ...ridCacheFifoBatchEvictionPolicySelfTest.java |  384 -------
 .../GridCacheFifoEvictionPolicySelfTest.java    |  372 ------
 .../lru/GridCacheLruEvictionPolicySelfTest.java |  417 -------
 .../GridCacheLruNearEvictionPolicySelfTest.java |  136 ---
 ...heNearOnlyLruNearEvictionPolicySelfTest.java |  168 ---
 .../eviction/lru/LruEvictionPolicySelfTest.java |  353 ++++++
 .../lru/LruNearEvictionPolicySelfTest.java      |  140 +++
 .../LruNearOnlyNearEvictionPolicySelfTest.java  |  172 +++
 .../GridCacheRandomEvictionPolicySelfTest.java  |  258 -----
 .../RandomEvictionPolicyCacheSizeSelfTest.java  |    6 +
 .../random/RandomEvictionPolicySelfTest.java    |  357 ++++++
 ...dCacheSortedBatchEvictionPolicySelfTest.java |  385 -------
 ...acheSortedEvictionPolicyPerformanceTest.java |  135 ---
 .../GridCacheSortedEvictionPolicySelfTest.java  |  373 -------
 .../SortedEvictionPolicyPerformanceTest.java    |  134 +++
 .../sorted/SortedEvictionPolicySelfTest.java    |  266 +++++
 .../loadtests/GridCacheMultiNodeLoadTest.java   |    5 +-
 .../GridCachePartitionedAtomicLongLoadTest.java |    6 +-
 .../swap/GridSwapEvictAllBenchmark.java         |    6 +-
 .../IgniteCacheEvictionSelfTestSuite.java       |   14 +-
 .../GridCacheOffheapIndexEntryEvictTest.java    |  200 ++++
 .../cache/GridCacheOffheapIndexGetSelfTest.java |   18 +-
 .../cache/GridCacheQueryMetricsSelfTest.java    |   84 +-
 .../cache/GridIndexingWithNoopSwapSelfTest.java |    6 +-
 ...QueryMultiThreadedOffHeapTieredSelfTest.java |   37 +
 ...eQueryMultiThreadedOffHeapTiredSelfTest.java |   37 -
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |   11 +-
 .../cache/ttl/CacheTtlAbstractSelfTest.java     |    6 +-
 .../IgniteCacheQuerySelfTestSuite.java          |    2 +-
 .../IgniteCacheWithIndexingTestSuite.java       |    1 +
 scripts/git-apply-patch.sh                      |    8 +-
 scripts/git-format-patch.sh                     |    6 +-
 scripts/git-patch-functions.sh                  |   36 +-
 81 files changed, 4240 insertions(+), 3504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4184f363/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4184f363/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------


[26/29] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5

Posted by ak...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff7827e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff7827e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff7827e4

Branch: refs/heads/ignite-992
Commit: ff7827e479b1fbcb79d934d8f240ae0a2ff9fc3d
Parents: f1cfd29 2a8e2ab
Author: avinogradov <av...@gridgain.com>
Authored: Mon Jun 8 15:18:21 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Mon Jun 8 15:18:21 2015 +0300

----------------------------------------------------------------------
 .../DataStreamerMultinodeCreateCacheTest.java   | 97 ++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |  1 +
 2 files changed, 98 insertions(+)
----------------------------------------------------------------------



[07/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a3eb572c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a3eb572c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a3eb572c

Branch: refs/heads/ignite-992
Commit: a3eb572ceeeb09f293f613ec6a572f8bc2a77ca0
Parents: 4184f36 ae5189a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:50:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 09:50:46 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |   3 -
 .../dht/GridDhtPartitionTopologyImpl.java       |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  10 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  31 -----
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  56 +++++++-
 .../tcp/ipfinder/TcpDiscoveryIpFinder.java      |  10 +-
 .../TcpDiscoveryMulticastIpFinder.java          |  47 +++++--
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  62 +++++++++
 .../tcp/TcpClientDiscoverySpiMulticastTest.java | 129 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  |  43 ++++---
 12 files changed, 321 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3eb572c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3eb572c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------


[20/29] incubator-ignite git commit: ignite-883

Posted by ak...@apache.org.
ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c7bc598a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c7bc598a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c7bc598a

Branch: refs/heads/ignite-992
Commit: c7bc598a32c06cd1acc6f3e1b162d059d80a53f2
Parents: f4da46c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 11:16:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 11:16:57 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/managers/discovery/GridDiscoveryManager.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7bc598a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 5983553..142dbaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -173,7 +173,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         new ConcurrentHashMap8<>();
 
     /** Map of dynamic cache filters. */
-    private Map<String, CachePredicate> registeredCaches = new ConcurrentHashMap<>();
+    private Map<String, CachePredicate> registeredCaches = new HashMap<>();
 
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();


[22/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-883_1' into ignite-sprint-5

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-883_1' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/015afdbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/015afdbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/015afdbf

Branch: refs/heads/ignite-992
Commit: 015afdbf1921e6ae5e8cd2f92efc8b4a695f05ca
Parents: 8467a3c c7bc598
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 13:38:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 13:38:48 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  26 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   8 +-
 .../internal/managers/GridManagerAdapter.java   |   9 +
 .../checkpoint/GridCheckpointManager.java       |  52 +-
 .../discovery/GridDiscoveryManager.java         |  28 +-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../GridCachePartitionExchangeManager.java      |  26 +-
 .../processors/cache/GridCacheTtlManager.java   |   9 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  18 +-
 .../GridDhtPartitionsExchangeFuture.java        |  50 +-
 .../cache/transactions/IgniteTxManager.java     |   3 -
 .../datastructures/DataStructuresProcessor.java | 107 +++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../timeout/GridSpiTimeoutObject.java           |  73 +++
 .../timeout/GridTimeoutProcessor.java           | 105 +++-
 .../util/nio/GridCommunicationClient.java       |  30 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../util/nio/GridTcpCommunicationClient.java    | 554 -------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   8 -
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  27 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  10 +
 .../ignite/spi/IgniteSpiTimeoutObject.java      |  44 ++
 .../spi/checkpoint/noop/NoopCheckpointSpi.java  |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 438 ++++-----------
 .../tcp/TcpCommunicationSpiMBean.java           |   2 -
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  10 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 156 +-----
 .../IgniteCountDownLatchAbstractSelfTest.java   | 102 ++++
 .../IgniteCacheClientNearCacheExpiryTest.java   | 103 ++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   4 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   2 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   2 -
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  13 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  25 +
 .../testframework/GridSpiTestContext.java       |  10 +
 39 files changed, 886 insertions(+), 1201 deletions(-)
----------------------------------------------------------------------



[21/29] incubator-ignite git commit: # ignite-sprint-5 sanity check for classname.properies load

Posted by ak...@apache.org.
# ignite-sprint-5 sanity check for classname.properies load


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8467a3c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8467a3c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8467a3c3

Branch: refs/heads/ignite-992
Commit: 8467a3c36414192268b8d852715fc501f53b4772
Parents: a5b5ec7
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 12:42:36 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 12:42:36 2015 +0300

----------------------------------------------------------------------
 .../internal/MarshallerContextAdapter.java      | 36 ++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8467a3c3/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
index 5dca2f2..21f2264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
 import org.jsr166.*;
@@ -49,10 +50,29 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
 
             Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE);
 
-            while (urls.hasMoreElements())
+            boolean foundClsNames = false;
+
+            while (urls.hasMoreElements()) {
                 processResource(urls.nextElement());
 
-            processResource(ldr.getResource(JDK_CLS_NAMES_FILE));
+                foundClsNames = true;
+            }
+
+            if (!foundClsNames)
+                throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+                    "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+            URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE);
+
+            if (jdkClsNames == null)
+                throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+                    "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+            processResource(jdkClsNames);
+
+            checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
+            checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE);
+            checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
         }
         catch (IOException e) {
             throw new IllegalStateException("Failed to initialize marshaller context.", e);
@@ -60,6 +80,18 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
     }
 
     /**
+     * @param clsName Class name.
+     * @param ldr Class loader used to get properties file.
+     * @param fileName File name.
+     */
+    private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
+        if (!map.containsKey(clsName.hashCode()))
+            throw new IgniteException("Failed to read class name from class names properties file. " +
+                "Make sure class names properties file packaged with ignite binaries is not corrupted " +
+                "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']');
+    }
+
+    /**
      * @param url Resource URL.
      * @throws IOException In case of error.
      */


[08/29] incubator-ignite git commit: # ignite-883

Posted by ak...@apache.org.
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bfc78ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bfc78ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bfc78ea

Branch: refs/heads/ignite-992
Commit: 6bfc78ea6a4559b4ee9059893e0a7f9d4195ad3c
Parents: a3eb572
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 10:04:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 10:04:18 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    | 18 ++++++++--
 .../datastructures/DataStructuresProcessor.java | 36 +++++++++++++++++---
 2 files changed, 48 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3fc50f..27d2dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private Timer updateNtfTimer;
 
+    /** */
+    @GridToStringExclude
+    private GridTimeoutProcessor.CancelableTask starveTask;
+
+    /** */
+    @GridToStringExclude
+    private GridTimeoutProcessor.CancelableTask metricsLogTask;
+
     /** Indicate error on grid stop. */
     @GridToStringExclude
     private boolean errOnStop;
@@ -859,7 +867,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            ctx.timeout().schedule(new Runnable() {
+            starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
@@ -886,7 +894,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            ctx.timeout().schedule(new Runnable() {
+            metricsLogTask = ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
                 @Override public void run() {
@@ -1700,6 +1708,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
+            if (starveTask != null)
+                starveTask.close();
+
+            if (metricsLogTask != null)
+                metricsLogTask.close();
+
             boolean interrupted = false;
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 27f6a29..2138639 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.datastructures;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -145,7 +146,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             dsCacheCtx = atomicsCache.context();
 
             qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
-                null,
+                new DataStructuresEntryFilter(),
                 dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
                 false);
         }
@@ -1051,10 +1052,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+    static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
-            throws CacheEntryListenerException {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+            if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
+                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+            else {
+                assert evt.getEventType() == EventType.REMOVED : evt;
+
+                return true;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryFilter.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DataStructuresEntryListener implements
+        CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(
+            Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+            throws CacheEntryListenerException
+        {
             for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
                 if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
                     GridCacheInternal val0 = evt.getValue();


[29/29] incubator-ignite git commit: # IGNITE-992 Review.

Posted by ak...@apache.org.
# IGNITE-992 Review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8740b6e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8740b6e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8740b6e7

Branch: refs/heads/ignite-992
Commit: 8740b6e76107eeb2885e9454f670044edf11e619
Parents: e934bca
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Jun 9 12:20:23 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Jun 9 12:20:23 2015 +0700

----------------------------------------------------------------------
 .../internal/util/IgniteExceptionRegistry.java     |  7 +++----
 .../visor/node/VisorNodeSuppressedErrorsTask.java  | 12 +++++++++++-
 .../internal/visor/util/VisorExceptionWrapper.java | 17 ++++++++++-------
 3 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8740b6e7/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
index 8ad3348..ab113d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteExceptionRegistry.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.util;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.visor.util.*;
 
 import java.io.*;
 import java.util.*;
@@ -161,7 +160,7 @@ public class IgniteExceptionRegistry {
 
         /** */
         @GridToStringExclude
-        private final VisorExceptionWrapper error;
+        private final Throwable error;
 
         /** */
         private final long threadId;
@@ -187,7 +186,7 @@ public class IgniteExceptionRegistry {
          */
         public ExceptionInfo(long order, Throwable error, String msg, long threadId, String threadName, long time) {
             this.order = order;
-            this.error = new VisorExceptionWrapper(error);
+            this.error = error;
             this.threadId = threadId;
             this.threadName = threadName;
             this.time = time;
@@ -211,7 +210,7 @@ public class IgniteExceptionRegistry {
         /**
          * @return Suppressed error.
          */
-        public VisorExceptionWrapper error() {
+        public Throwable error() {
             return error;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8740b6e7/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
index 8b39d09..9fc1cc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -83,12 +84,21 @@ public class VisorNodeSuppressedErrorsTask extends VisorMultiNodeTask<Map<UUID,
 
             List<IgniteExceptionRegistry.ExceptionInfo> errors = ignite.context().exceptionRegistry().getErrors(order);
 
+            List<IgniteExceptionRegistry.ExceptionInfo> wrapped = new ArrayList<>(errors.size());
+
             for (IgniteExceptionRegistry.ExceptionInfo error : errors) {
                 if (error.order() > order)
                     order = error.order();
+
+                wrapped.add(new IgniteExceptionRegistry.ExceptionInfo(error.order(),
+                    new VisorExceptionWrapper(error.error()),
+                    error.message(),
+                    error.threadId(),
+                    error.threadName(),
+                    error.time()));
             }
 
-            return new IgniteBiTuple<>(order, errors);
+            return new IgniteBiTuple<>(order, wrapped);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8740b6e7/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
index e253dcf..d2ae0e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java
@@ -21,14 +21,17 @@ package org.apache.ignite.internal.visor.util;
  * Exception wrapper for safe for transferring to Visor.
  */
 public class VisorExceptionWrapper extends Throwable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** Detail message string of this throwable */
     private String detailMsg;
 
     /** Simple class name of base throwable object. */
-    private String classSimpleName;
+    private String clsSimpleName;
 
     /** Class name of base throwable object. */
-    private String className;
+    private String clsName;
 
     /**
      * Wrap throwable by presented on Visor throwable object.
@@ -38,8 +41,8 @@ public class VisorExceptionWrapper extends Throwable {
     public VisorExceptionWrapper(Throwable cause) {
         assert cause != null;
 
-        classSimpleName = cause.getClass().getSimpleName();
-        className = cause.getClass().getName();
+        clsSimpleName = cause.getClass().getSimpleName();
+        clsName = cause.getClass().getName();
 
         detailMsg = cause.getMessage();
 
@@ -56,14 +59,14 @@ public class VisorExceptionWrapper extends Throwable {
      * @return Class simple name of base throwable object.
      */
     public String getClassSimpleName() {
-        return classSimpleName;
+        return clsSimpleName;
     }
 
     /**
      * @return Class name of base throwable object.
      */
     public String getClassName() {
-        return className;
+        return clsName;
     }
 
     /** {@inheritDoc} */
@@ -73,6 +76,6 @@ public class VisorExceptionWrapper extends Throwable {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return (detailMsg != null) ? (className + ": " + detailMsg) : className;
+        return (detailMsg != null) ? (clsName + ": " + detailMsg) : clsName;
     }
 }


[28/29] incubator-ignite git commit: Merge branches 'ignite-992' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-992

Posted by ak...@apache.org.
Merge branches 'ignite-992' and 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-992


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e934bcad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e934bcad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e934bcad

Branch: refs/heads/ignite-992
Commit: e934bcad4235494986a0dd30352210255beb927e
Parents: 0eee664 0fa2853
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Jun 9 12:10:23 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Jun 9 12:10:23 2015 +0700

----------------------------------------------------------------------
 DEVNOTES.txt                                    |  42 +-
 .../apache/ignite/internal/IgniteKernal.java    |  26 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   8 +-
 .../internal/MarshallerContextAdapter.java      |  36 +-
 .../internal/managers/GridManagerAdapter.java   |   9 +
 .../checkpoint/GridCheckpointManager.java       |  52 +-
 .../discovery/GridDiscoveryManager.java         |  28 +-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../GridCachePartitionExchangeManager.java      |  26 +-
 .../processors/cache/GridCacheTtlManager.java   |   9 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  18 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |  40 +-
 .../GridDhtPartitionsExchangeFuture.java        |  50 +-
 .../cache/transactions/IgniteTxManager.java     |   3 -
 .../datastructures/DataStructuresProcessor.java | 107 +++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../timeout/GridSpiTimeoutObject.java           |  73 +++
 .../timeout/GridTimeoutProcessor.java           | 105 +++-
 .../util/nio/GridCommunicationClient.java       |  30 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../util/nio/GridTcpCommunicationClient.java    | 554 -------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   8 -
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  27 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  10 +
 .../ignite/spi/IgniteSpiTimeoutObject.java      |  44 ++
 .../spi/checkpoint/noop/NoopCheckpointSpi.java  |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 438 ++++-----------
 .../tcp/TcpCommunicationSpiMBean.java           |   2 -
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  10 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 156 +-----
 .../IgniteCountDownLatchAbstractSelfTest.java   | 102 ++++
 .../IgniteCacheClientNearCacheExpiryTest.java   | 103 ++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 .../DataStreamerMultinodeCreateCacheTest.java   |  97 ++++
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   4 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   2 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   2 -
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  13 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  25 +
 .../testframework/GridSpiTestContext.java       |  10 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   1 +
 44 files changed, 1073 insertions(+), 1230 deletions(-)
----------------------------------------------------------------------



[14/29] incubator-ignite git commit: # ignite-883

Posted by ak...@apache.org.
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e237b00d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e237b00d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e237b00d

Branch: refs/heads/ignite-992
Commit: e237b00d84039383723e00929b639fa62be03de7
Parents: fb827a77
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 16:12:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 18:10:34 2015 +0300

----------------------------------------------------------------------
 .../util/nio/GridNioRecoveryDescriptor.java     | 11 ++++++---
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  9 ++++---
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 25 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e237b00d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 08a9937..3bc81ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -176,9 +176,11 @@ public class GridNioRecoveryDescriptor {
         while (acked < rcvCnt) {
             GridNioFuture<?> fut = msgFuts.pollFirst();
 
-            assert fut != null;
+            assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
+                ", acked=" + acked +
+                ", desc=" + this + ']';
 
-            assert fut.isDone();
+            assert fut.isDone() : fut;
 
             acked++;
         }
@@ -239,7 +241,10 @@ public class GridNioRecoveryDescriptor {
      * @param rcvCnt Number of messages received by remote node.
      */
     public void onHandshake(long rcvCnt) {
-        ackReceived(rcvCnt);
+        synchronized (this) {
+            if (!nodeLeft)
+                ackReceived(rcvCnt);
+        }
 
         resendCnt = msgFuts.size();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e237b00d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 302b721..0270a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1752,7 +1752,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
             assert lastMsgId != null;
 
-            Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size());
+            Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
 
             boolean skip = true;
 
@@ -1762,10 +1762,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                         skip = false;
                 }
                 else
-                    copy.add(msg);
+                    cp.add(msg);
             }
 
-            return !skip ? copy : null;
+            return !skip ? cp : null;
         }
 
         /**
@@ -2704,8 +2704,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 node.aliveCheck(spi.maxMissedClientHbs);
 
                 if (isLocalNodeCoordinator()) {
-                    Collection<TcpDiscoveryAbstractMessage> pending =
-                        pendingMsgs.messages(msg.lastMessageId());
+                    Collection<TcpDiscoveryAbstractMessage> pending = pendingMsgs.messages(msg.lastMessageId());
 
                     if (pending != null) {
                         msg.pendingMessages(pending);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e237b00d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index f7be340..7333020 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -469,6 +469,31 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testClientReconnectOneServerOneClient() throws Exception {
+        clientsPerSrv = 1;
+
+        startServerNodes(1);
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+
+        srvLeftLatch = new CountDownLatch(1);
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        ((TcpDiscoverySpi)G.ignite("client-0").configuration().getDiscoverySpi()).brakeConnection();
+
+        assertFalse(srvFailedLatch.await(2000, TimeUnit.MILLISECONDS));
+
+        assertEquals(1L, srvLeftLatch.getCount());
+
+        checkNodes(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testGetMissedMessagesOnReconnect() throws Exception {
         clientsPerSrv = 1;
 


[25/29] incubator-ignite git commit: Devnotes sprint-5

Posted by ak...@apache.org.
Devnotes sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f1cfd298
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f1cfd298
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f1cfd298

Branch: refs/heads/ignite-992
Commit: f1cfd2985803c8b05462737b847f956ea043341a
Parents: 015afdb
Author: avinogradov <av...@gridgain.com>
Authored: Mon Jun 8 15:17:54 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Mon Jun 8 15:17:54 2015 +0300

----------------------------------------------------------------------
 DEVNOTES.txt | 42 +++++++++++++++++++++++++++++-------------
 1 file changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f1cfd298/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index 1562dc4..b7fea83 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -24,12 +24,14 @@ NOTE: JDK version should be 1.7.0-* or >= 1.8.0-u40.
 
 Ignite Release Instructions
 ===========================
-Use people.apache.org/keys/committer/<username>.asc key to generate KEYS file.
-Execute these commands at source root:
+Use your people.apache.org/keys/committer/<username>.asc key to generate KEYS file.
+Download https://dist.apache.org/repos/dist/release/incubator/ignite/KEYS and append you key using commands:
 
 gpg --list-sigs <keyname> >> KEYS
 gpg --armor --export <keyname> >> KEYS
 
+Upload modified KEYS file.
+
 Specify gpg profile at settings.xml. It will be used to sign sources and artifacts.
 
 <profile>
@@ -43,24 +45,38 @@ Specify gpg profile at settings.xml. It will be used to sign sources and artifac
 Ensure you have RELEASE (not SNAPSHOT) version at Ignite poms.
 Maven release plugin release:prepare goal can be used to make release tag.
 
-Deploy Ignite release candidate to maven repository and dev-svn:
+Deploy Ignite release candidate to maven repository and dev-svn, make tag:
+
+   Following command deploys Ignite to maven repository, prepares sources and fabric edition binaries.
+      mvn deploy -P apache-release,gpg,release,scala,lgpl,deploy-ignite-site -Dignite.edition=fabric -DskipTests -B
+
+   In case you want to release both fabric and hadoop editions you have to build hadoop edition first using command
+      mvn package -P apache-release,gpg,release,scala,lgpl -Dignite.edition=hadoop -DskipTests -B
+   save /target/bin/*.zip, make "mvn clean" and restore saved files before deploying fabric.
 
-   mvn deploy -P apache-release,gpg,release,scala,lgpl,deploy-ignite-site -Dignite.edition=fabric -DskipTests -B
+   Binary artifact name can be changed by setting additional property -Dignite.zip.pattern. Binary artifact will be
+   created inside /target/bin folder when release profile is used.
 
-   Binary artifact name can be changed by setting additional property -Dignite.zip.pattern.
-   Sources package name is fixed. Sources package zip will be created automatically when apache-release profile used.
+   Sources artifact name is fixed. Sources artifact will be created inside /target dir when apache-release profile is used.
 
-   In case you want to release both fabric and hadoop editions you have to build hadoop first, save /target/bin/*.zip,
-   make "mvn clean" and restore them before deploy step.
+   Nexus staging (repository.apache.org) should be closed with appropriate comment contains release version and
+   release candidate number, for example "Apache Ignite 1.0.0-rc7", when mvn deploy finished.
 
-   Nexus staging (repository.apache.org) should be closed when mvn deploy finished.
-   Checkout https://dist.apache.org/repos/dist/dev/incubator/ignite svn. Create new folder with name equals to released
-   version at svn root. Copy target/site folder content to svn/ignite/<version> folder and commit.
+   Checkout https://dist.apache.org/repos/dist/dev/incubator/ignite svn. Create release candidate folder with name
+   equals to release version with "-rc*" ending, for example "1.0.0-rc7", at svn root.
+   Copy /target/site folder content to svn/ignite/<rc-version> folder and commit with appropriate comment.
 
-Start vote based on https://dist.apache.org/repos/dist/dev/incubator/ignite/<version>.
+   Make appropriate git tag for release candidate, for example "ignite-1.0.0-incubating-rc7".
 
-Release nexus staging and copy binaries and sources from https://dist.apache.org/repos/dist/dev/incubator/ignite/<version>
+Start vote based on https://dist.apache.org/repos/dist/dev/incubator/ignite/<rc-version>.
+
+Release nexus staging, move binaries and sources from https://dist.apache.org/repos/dist/dev/incubator/ignite/<rc-version>
 to https://dist.apache.org/repos/dist/release/incubator/ignite/<version> when version accepted.
+Use svn mv ^/dev/incubator/ignite/<rc-version> ^/release/incubator/ignite/<version> command for proper moving.
+
+Make appropriate git tag for released version, for example "ignite-1.0.0-incubating".
+
+Send an email to dev@ignite.incubator.apache.org contains release svn url.
 
 
 JCache TCK compliance


[18/29] incubator-ignite git commit: # ignite-883 minor

Posted by ak...@apache.org.
# ignite-883 minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/730ef104
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/730ef104
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/730ef104

Branch: refs/heads/ignite-992
Commit: 730ef104ad8938c88793c374abd2396cf8899f17
Parents: 0a4e7dd
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 10:19:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 10:19:46 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioRecoveryDescriptor.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730ef104/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 3bc81ea..733ae81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -244,9 +244,9 @@ public class GridNioRecoveryDescriptor {
         synchronized (this) {
             if (!nodeLeft)
                 ackReceived(rcvCnt);
-        }
 
-        resendCnt = msgFuts.size();
+            resendCnt = msgFuts.size();
+        }
     }
 
     /**


[04/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a2e898d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a2e898d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a2e898d

Branch: refs/heads/ignite-992
Commit: 7a2e898dd22fd70396d7e7d19b301c0632281347
Parents: 997d65e 4f3788d
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 12:01:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 2 12:01:00 2015 +0300

----------------------------------------------------------------------
 .../hibernate/CacheHibernatePersonStore.java    | 202 +---------
 .../hibernate/CacheHibernateStoreExample.java   |  17 +
 .../store/jdbc/CacheJdbcPersonStore.java        | 180 ++-------
 .../store/jdbc/CacheJdbcStoreExample.java       |  13 +
 .../store/spring/CacheSpringPersonStore.java    | 128 ++++++
 .../store/spring/CacheSpringStoreExample.java   | 143 +++++++
 .../datagrid/store/spring/package-info.java     |  22 ++
 .../apache/ignite/cache/store/CacheStore.java   |   2 +
 .../ignite/cache/store/CacheStoreSession.java   |  22 ++
 .../cache/store/CacheStoreSessionListener.java  | 133 +++++++
 .../jdbc/CacheJdbcStoreSessionListener.java     | 141 +++++++
 .../configuration/CacheConfiguration.java       |  32 ++
 .../configuration/IgniteConfiguration.java      |  38 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../managers/communication/GridIoManager.java   |  12 +-
 .../processors/cache/GridCacheProcessor.java    |  12 +-
 .../cache/GridCacheSharedContext.java           |  47 ++-
 .../processors/cache/GridCacheUtils.java        |  54 +++
 .../cache/store/CacheOsStoreManager.java        |   1 -
 .../cache/store/CacheStoreManager.java          |   7 +-
 .../store/GridCacheStoreManagerAdapter.java     | 202 ++++++++--
 .../cache/transactions/IgniteTxAdapter.java     |  33 +-
 .../transactions/IgniteTxLocalAdapter.java      | 142 ++++---
 ...cheStoreSessionListenerAbstractSelfTest.java | 315 +++++++++++++++
 ...heStoreSessionListenerLifecycleSelfTest.java | 395 +++++++++++++++++++
 .../CacheJdbcStoreSessionListenerSelfTest.java  | 175 ++++++++
 .../IgniteCrossCacheTxStoreSelfTest.java        | 147 ++++---
 .../loadtests/hashmap/GridCacheTestContext.java |   3 +-
 .../junits/cache/TestCacheSession.java          |  18 +
 .../cache/TestThreadLocalCacheSession.java      |  15 +
 .../junits/common/GridCommonAbstractTest.java   |  24 ++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 .../CacheHibernateStoreSessionListener.java     | 216 ++++++++++
 ...heHibernateStoreSessionListenerSelfTest.java | 228 +++++++++++
 .../testsuites/IgniteHibernateTestSuite.java    |   2 +
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +-
 modules/spring/pom.xml                          |  14 +
 .../spring/CacheSpringStoreSessionListener.java | 207 ++++++++++
 ...CacheSpringStoreSessionListenerSelfTest.java | 197 +++++++++
 .../testsuites/IgniteSpringTestSuite.java       |   3 +
 40 files changed, 3057 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a2e898d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------


[15/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/75006b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/75006b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/75006b80

Branch: refs/heads/ignite-992
Commit: 75006b805a9997354ea9a1c645ad11ab8f9ba631
Parents: e237b00 5f06f57
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 18:11:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 18:11:06 2015 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityProcessor.java         | 23 ++++-
 .../datastreamer/DataStreamerImpl.java          | 92 ++++++++++++++------
 .../DataStreamerMultiThreadedSelfTest.java      | 59 +++++++++----
 .../IgniteMessagingWithClientTest.java          |  2 +
 4 files changed, 131 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[16/29] incubator-ignite git commit: # ignite-883 javadoc

Posted by ak...@apache.org.
# ignite-883 javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50b0b499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50b0b499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50b0b499

Branch: refs/heads/ignite-992
Commit: 50b0b49987a99fd10691346244f7c60b8d2b6ddd
Parents: 75006b8
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 10:05:39 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 10:05:39 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50b0b499/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
index f7fbd27f..b3fc28e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
@@ -20,7 +20,11 @@ package org.apache.ignite.spi;
 import org.apache.ignite.lang.*;
 
 /**
- *
+ * Provides possibility to schedule delayed execution,
+ * see {@link IgniteSpiContext#addTimeoutObject(IgniteSpiTimeoutObject)}.
+ * <p>
+ * Note: all timeout objects are executed in single dedicated thread, so implementation
+ * of {@link #onTimeout()} should not use time consuming and blocking method.
  */
 public interface IgniteSpiTimeoutObject {
     /**


[12/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db576526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db576526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db576526

Branch: refs/heads/ignite-992
Commit: db576526600b965d14fe271ac4624401a5b4c239
Parents: a676152 ead66e7
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 10:02:10 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 10:02:10 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/visor/cache/VisorCache.java  |  2 +-
 .../VisorCacheConfigurationCollectorJob.java     |  6 +++---
 .../internal/visor/cache/VisorCacheMetrics.java  | 19 +++++++++++--------
 .../cache/VisorCacheMetricsCollectorTask.java    | 10 ++++++----
 .../cache/VisorCacheStoreConfiguration.java      |  5 ++---
 5 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------



[24/29] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5

Posted by ak...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2a8e2abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2a8e2abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2a8e2abe

Branch: refs/heads/ignite-992
Commit: 2a8e2abe4081334802f9950757b7d3776a3a6179
Parents: 837462f 015afdb
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 14:55:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 14:55:48 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  26 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   8 +-
 .../internal/MarshallerContextAdapter.java      |  36 +-
 .../internal/managers/GridManagerAdapter.java   |   9 +
 .../checkpoint/GridCheckpointManager.java       |  52 +-
 .../discovery/GridDiscoveryManager.java         |  28 +-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../GridCachePartitionExchangeManager.java      |  26 +-
 .../processors/cache/GridCacheTtlManager.java   |   9 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  18 +-
 .../GridDhtPartitionsExchangeFuture.java        |  50 +-
 .../cache/transactions/IgniteTxManager.java     |   3 -
 .../datastructures/DataStructuresProcessor.java | 107 +++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../timeout/GridSpiTimeoutObject.java           |  73 +++
 .../timeout/GridTimeoutProcessor.java           | 105 +++-
 .../util/nio/GridCommunicationClient.java       |  30 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../util/nio/GridTcpCommunicationClient.java    | 554 -------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   8 -
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  27 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  10 +
 .../ignite/spi/IgniteSpiTimeoutObject.java      |  44 ++
 .../spi/checkpoint/noop/NoopCheckpointSpi.java  |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 438 ++++-----------
 .../tcp/TcpCommunicationSpiMBean.java           |   2 -
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  10 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 156 +-----
 .../IgniteCountDownLatchAbstractSelfTest.java   | 102 ++++
 .../IgniteCacheClientNearCacheExpiryTest.java   | 103 ++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   4 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   2 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   2 -
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  13 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  25 +
 .../testframework/GridSpiTestContext.java       |  10 +
 40 files changed, 920 insertions(+), 1203 deletions(-)
----------------------------------------------------------------------



[27/29] incubator-ignite git commit: # ignite-sprint-5 minor optimization in force keys future

Posted by ak...@apache.org.
# ignite-sprint-5 minor optimization in force keys future


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0fa2853e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0fa2853e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0fa2853e

Branch: refs/heads/ignite-992
Commit: 0fa2853e060fee7c9c8c8484be412e91d30c52da
Parents: ff7827e
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 16:31:31 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 16:31:31 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtForceKeysFuture.java   | 40 +++++++++++++-------
 1 file changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fa2853e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 9637fd1..1d57ef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @return {@code True} if some mapping was added.
      */
     private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
-        Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
-
-        ClusterNode loc = cctx.localNode();
-
-        int curTopVer = topCntr.get();
+        Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
 
         for (KeyCacheObject key : keys)
-            map(key, mappings, exc);
+            mappings = map(key, mappings, exc);
 
         if (isDone())
             return false;
 
         boolean ret = false;
 
-        if (!mappings.isEmpty()) {
+        if (mappings != null) {
+            ClusterNode loc = cctx.localNode();
+
+            int curTopVer = topCntr.get();
+
             preloader.addFuture(this);
 
             trackable = true;
@@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param key Key.
      * @param exc Exclude nodes.
      * @param mappings Mappings.
+     * @return Mappings.
      */
-    private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
+    private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
+        @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
+        Collection<ClusterNode> exc)
+    {
         ClusterNode loc = cctx.localNode();
 
-        int part = cctx.affinity().partition(key);
-
         GridCacheEntryEx e = cctx.dht().peekEx(key);
 
         try {
             if (e != null && !e.isNewLocked()) {
-                if (log.isDebugEnabled())
+                if (log.isDebugEnabled()) {
+                    int part = cctx.affinity().partition(key);
+
                     log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
                         ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
+                }
 
                 // Key has been rebalanced or retrieved already.
-                return;
+                return mappings;
             }
         }
         catch (GridCacheEntryRemovedException ignore) {
@@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     ", locId=" + cctx.nodeId() + ']');
         }
 
+        int part = cctx.affinity().partition(key);
+
         List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
             new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
 
@@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
 
             // Key is already rebalanced.
-            return;
+            return mappings;
         }
 
         // Create partition.
@@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
                         key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
 
-                return;
+                return mappings;
             }
 
+            if (mappings == null)
+                mappings = U.newHashMap(keys.size());
+
             Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
 
             assert mappedKeys != null;
@@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                 log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
                     ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
         }
+
+        return mappings;
     }
 
     /**


[02/29] incubator-ignite git commit: # IGNITE-883 Don't create public pool on client node

Posted by ak...@apache.org.
# IGNITE-883 Don't create public pool on client node


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bfae8897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bfae8897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bfae8897

Branch: refs/heads/ignite-992
Commit: bfae889766d92a126a5fc2938b196f83249cc097
Parents: dce6789
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Jun 1 18:10:28 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Jun 1 19:49:54 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bfae8897/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 6e4efb5..d9f6bb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,16 +1447,6 @@ public class IgnitionEx {
                 ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
             }
 
-            execSvc = new IgniteThreadPoolExecutor(
-                "pub-" + cfg.getGridName(),
-                cfg.getPublicThreadPoolSize(),
-                cfg.getPublicThreadPoolSize(),
-                DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
             sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1466,8 +1456,30 @@ public class IgnitionEx {
                 DFLT_SYSTEM_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+            boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
+
+            if (isClientMode) {
+                execSvc = new IgniteThreadPoolExecutor(
+                    "pub-" + cfg.getGridName(),
+                    0,
+                    cfg.getPublicThreadPoolSize(),
+                    2000,
+                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+            }
+            else {
+                execSvc = new IgniteThreadPoolExecutor(
+                    "pub-" + cfg.getGridName(),
+                    cfg.getPublicThreadPoolSize(),
+                    cfg.getPublicThreadPoolSize(),
+                    DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+            }
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.


[03/29] incubator-ignite git commit: # IGNITE-883 Create and use GridTimerProcessor.schedule()

Posted by ak...@apache.org.
# IGNITE-883 Create and use GridTimerProcessor.schedule()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/997d65e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/997d65e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/997d65e2

Branch: refs/heads/ignite-992
Commit: 997d65e2aeb5d1c7a62a95120876238b0de6d3d4
Parents: bfae889
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Jun 1 18:10:28 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Jun 1 19:50:46 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  28 +----
 .../discovery/GridDiscoveryManager.java         |  28 ++---
 .../timeout/GridTimeoutProcessor.java           | 107 ++++++++++++++++++-
 3 files changed, 117 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..e3fc50f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,14 +167,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private Timer updateNtfTimer;
 
-    /** */
-    @GridToStringExclude
-    private Timer starveTimer;
-
-    /** */
-    @GridToStringExclude
-    private Timer metricsLogTimer;
-
     /** Indicate error on grid stop. */
     @GridToStringExclude
     private boolean errOnStop;
@@ -867,13 +859,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            starveTimer = new Timer("ignite-starvation-checker");
-
-            starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+            ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (!(execSvc instanceof ThreadPoolExecutor))
                         return;
 
@@ -896,13 +886,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            metricsLogTimer = new Timer("ignite-metrics-logger");
-
-            metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
-                /** */
+            ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (log.isInfoEnabled()) {
                         ClusterMetrics m = cluster().localNode().metrics();
 
@@ -1713,13 +1700,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
-            if (starveTimer != null)
-                starveTimer.cancel();
-
-            // Cancel metrics log timer.
-            if (metricsLogTimer != null)
-                metricsLogTimer.cancel();
-
             boolean interrupted = false;
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4ef602e..9b8280e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -165,7 +166,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private final GridLocalMetrics metrics = createMetrics();
 
     /** Metrics update worker. */
-    private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+    private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
 
     /** Custom event listener. */
     private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
@@ -325,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             checkSegmentOnStart();
         }
 
-        new IgniteThread(metricsUpdater).start();
+        metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
 
         spi.setMetricsProvider(createMetricsProvider());
 
@@ -987,11 +988,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         getSpi().setListener(null);
 
         // Stop discovery worker and metrics updater.
+        U.closeQuiet(metricsUpdateTask);
+
         U.cancel(discoWrk);
-        U.cancel(metricsUpdater);
 
         U.join(discoWrk, log);
-        U.join(metricsUpdater, log);
 
         // Stop SPI itself.
         stopSpi();
@@ -1879,28 +1880,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      *
      */
-    private class MetricsUpdater extends GridWorker {
+    private class MetricsUpdater implements Runnable {
         /** */
         private long prevGcTime = -1;
 
         /** */
         private long prevCpuTime = -1;
 
-        /**
-         *
-         */
-        private MetricsUpdater() {
-            super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
-        }
-
         /** {@inheritDoc} */
-        @Override protected void body() throws IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                U.sleep(METRICS_UPDATE_FREQ);
-
-                gcCpuLoad = getGcCpuLoad();
-                cpuLoad = getCpuLoad();
-            }
+        @Override public void run() {
+            gcCpuLoad = getGcCpuLoad();
+            cpuLoad = getCpuLoad();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e9b7717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -24,8 +24,10 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -40,10 +42,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
             /** {@inheritDoc} */
             @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
-                long time1 = o1.endTime();
-                long time2 = o2.endTime();
+                int res = Long.compare(o1.endTime(), o2.endTime());
 
-                return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+                if (res != 0)
+                    return res;
+
+                return o1.timeoutId().compareTo(o2.timeoutId());
             }
         });
 
@@ -98,6 +102,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Schedule the specified timer task for execution at the specified
+     * time with the specified period, in milliseconds.
+     *
+     * @param task Task to execute.
+     * @param delay Delay to first execution in milliseconds.
+     * @param period Period for execution in milliseconds or -1.
+     * @return Cancelable to cancel task.
+     */
+    public CancelableTask schedule(Runnable task, long delay, long period) {
+        assert delay >= 0;
+        assert period > 0 || period == -1;
+
+        CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+        addTimeoutObject(obj);
+
+        return obj;
+    }
+
+    /**
      * @param timeoutObj Timeout object.
      */
     public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +197,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']');
         X.println(">>>   timeoutObjsSize: " + timeoutObjs.size());
     }
+
+    /**
+     *
+     */
+    public class CancelableTask implements GridTimeoutObject, Closeable {
+        /** */
+        private final IgniteUuid id = new IgniteUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long period;
+
+        /** */
+        private volatile boolean cancel;
+
+        /** */
+        private final Runnable task;
+
+        /**
+         * @param firstTime First time.
+         * @param period Period.
+         * @param task Task to execute.
+         */
+        CancelableTask(Runnable task, long firstTime, long period) {
+            this.task = task;
+            endTime = firstTime;
+            this.period = period;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void onTimeout() {
+            if (cancel)
+                return;
+
+            long startTime = U.currentTimeMillis();
+
+            try {
+                task.run();
+            }
+            finally {
+                long executionTime = U.currentTimeMillis() - startTime;
+
+                if (executionTime > 10) {
+                    U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
+                        "quickly [executionTime=" + executionTime + ']');
+                }
+
+                if (!cancel && period > 0) {
+                    endTime = U.currentTimeMillis() + period;
+
+                    addTimeoutObject(this);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            cancel = true;
+
+            synchronized (this) {
+                // Just waiting for task execution end to make sure that task will not be executed anymore.
+                removeTimeoutObject(this);
+            }
+        }
+    }
 }


[19/29] incubator-ignite git commit: # ignite-883 fix ConcurrentModificationException during DiscoCache initialization from the GridDiscoveryManager.start

Posted by ak...@apache.org.
# ignite-883 fix ConcurrentModificationException during DiscoCache initialization from the GridDiscoveryManager.start


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f4da46cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f4da46cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f4da46cd

Branch: refs/heads/ignite-992
Commit: f4da46cd25b61b3d7b55b6fba3884c8fa05f5cb8
Parents: 730ef10
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 8 10:27:04 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 8 10:27:04 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/managers/discovery/GridDiscoveryManager.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f4da46cd/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 142dbaa..5983553 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -173,7 +173,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         new ConcurrentHashMap8<>();
 
     /** Map of dynamic cache filters. */
-    private Map<String, CachePredicate> registeredCaches = new HashMap<>();
+    private Map<String, CachePredicate> registeredCaches = new ConcurrentHashMap<>();
 
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();


[09/29] incubator-ignite git commit: # ignite-883

Posted by ak...@apache.org.
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ad2f4efc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ad2f4efc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ad2f4efc

Branch: refs/heads/ignite-992
Commit: ad2f4efce373b6fa08953ad030862175527cd009
Parents: 6bfc78e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 13:34:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 13:34:57 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 26 +++++-----
 .../dht/atomic/GridDhtAtomicCache.java          | 18 ++++---
 .../GridDhtPartitionsExchangeFuture.java        | 50 +++++++++++---------
 3 files changed, 53 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad2f4efc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 488227b..3236bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1144,20 +1144,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         /** {@inheritDoc} */
         @Override public void onTimeout() {
-            if (!busyLock.readLock().tryLock())
-                return;
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    if (!busyLock.readLock().tryLock())
+                        return;
 
-            try {
-                if (started.compareAndSet(false, true))
-                    refreshPartitions();
-            }
-            finally {
-                busyLock.readLock().unlock();
+                    try {
+                        if (started.compareAndSet(false, true))
+                            refreshPartitions();
+                    }
+                    finally {
+                        busyLock.readLock().unlock();
 
-                cctx.time().removeTimeoutObject(this);
+                        cctx.time().removeTimeoutObject(ResendTimeoutObject.this);
 
-                pendingResend.compareAndSet(this, null);
-            }
+                        pendingResend.compareAndSet(ResendTimeoutObject.this, null);
+                    }
+                }
+            });
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad2f4efc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9ca80f9..ce91c6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2775,14 +2775,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /** {@inheritDoc} */
         @Override public void onTimeout() {
             if (guard.compareAndSet(false, true)) {
-                writeLock().lock();
+                ctx.closures().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        writeLock().lock();
 
-                try {
-                    finish();
-                }
-                finally {
-                    writeLock().unlock();
-                }
+                        try {
+                            finish();
+                        }
+                        finally {
+                            writeLock().unlock();
+                        }
+                    }
+                });
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad2f4efc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e0bfee6..3362265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1389,30 +1389,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
                 cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
                 @Override public void onTimeout() {
-                    if (isDone())
-                        return;
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            if (isDone())
+                                return;
+
+                            if (!enterBusy())
+                                return;
+
+                            try {
+                                U.warn(log,
+                                    "Retrying preload partition exchange due to timeout [done=" + isDone() +
+                                        ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
+                                        ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
+                                        ", init=" + init + ", initFut=" + initFut.isDone() +
+                                        ", ready=" + ready + ", replied=" + replied + ", added=" + added +
+                                        ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
+                                        oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
+                                        ", locNodeOrder=" + cctx.localNode().order() +
+                                        ", locNodeId=" + cctx.localNode().id() + ']',
+                                    "Retrying preload partition exchange due to timeout.");
 
-                    if (!enterBusy())
-                        return;
-
-                    try {
-                        U.warn(log,
-                            "Retrying preload partition exchange due to timeout [done=" + isDone() +
-                                ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
-                                ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
-                                ", init=" + init + ", initFut=" + initFut.isDone() +
-                                ", ready=" + ready + ", replied=" + replied + ", added=" + added +
-                                ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
-                                oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
-                                ", locNodeOrder=" + cctx.localNode().order() +
-                                ", locNodeId=" + cctx.localNode().id() + ']',
-                            "Retrying preload partition exchange due to timeout.");
-
-                        recheck();
-                    }
-                    finally {
-                        leaveBusy();
-                    }
+                                recheck();
+                            }
+                            finally {
+                                leaveBusy();
+                            }
+                        }
+                    });
                 }
             };
 


[10/29] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1

Posted by ak...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-883_1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3417b3dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3417b3dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3417b3dc

Branch: refs/heads/ignite-992
Commit: 3417b3dc0904708dcbea6396f88737a2aa99fa79
Parents: 6bfc78e c9f7291
Author: sboikov <se...@inria.fr>
Authored: Thu Jun 4 21:28:32 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu Jun 4 21:28:32 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  13 +-
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../managers/communication/GridIoManager.java   | 117 ++++----
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  19 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../processors/hadoop/HadoopTaskContext.java    |  14 +-
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   2 +-
 .../internal/visor/query/VisorQueryJob.java     |   2 +-
 ...niteDynamicCacheWithConfigStartSelfTest.java |  97 +++++++
 .../igfs/IgfsClientCacheSelfTest.java           |   9 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   1 +
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  14 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  70 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   2 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
 .../hadoop/SecondaryFileSystemProvider.java     |   4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  20 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  31 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  26 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  48 +++-
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../hadoop/HadoopAbstractSelfTest.java          |  14 +-
 .../hadoop/HadoopCommandLineTest.java           |  14 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 176 +++++++++++-
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../collections/HadoopAbstractMapTest.java      |  12 +
 ...acheConfigurationPrimitiveTypesSelfTest.java | 104 +++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 .../commands/cache/VisorCacheScanCommand.scala  |   2 +-
 38 files changed, 1122 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3417b3dc/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------


[05/29] incubator-ignite git commit: # ignite-883

Posted by ak...@apache.org.
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/873e01bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/873e01bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/873e01bb

Branch: refs/heads/ignite-992
Commit: 873e01bbb61ed5782635fa81e1c9834f261f50fe
Parents: 7a2e898
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 2 12:29:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jun 3 16:52:47 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  38 +-
 .../internal/managers/GridManagerAdapter.java   |   9 +
 .../checkpoint/GridCheckpointManager.java       |  52 ++-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../processors/cache/GridCacheTtlManager.java   |   9 +-
 .../cache/transactions/IgniteTxManager.java     |   3 -
 .../datastructures/DataStructuresProcessor.java |  66 +--
 .../timeout/GridSpiTimeoutObject.java           |  59 +++
 .../timeout/GridTimeoutProcessor.java           |  24 +-
 .../util/nio/GridCommunicationClient.java       |  26 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  23 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  10 +
 .../ignite/spi/IgniteSpiTimeoutObject.java      |  40 ++
 .../spi/checkpoint/noop/NoopCheckpointSpi.java  |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 409 ++++++-------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   1 -
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 156 +------
 .../IgniteCountDownLatchAbstractSelfTest.java   | 102 +++++
 .../IgniteCacheClientNearCacheExpiryTest.java   | 103 +++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 .../testframework/GridSpiTestContext.java       |  10 +
 22 files changed, 642 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index ed33365..5cbe377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,6 +1447,17 @@ public class IgnitionEx {
                 ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
             }
 
+            execSvc = new IgniteThreadPoolExecutor(
+                "pub-" + cfg.getGridName(),
+                cfg.getPublicThreadPoolSize(),
+                cfg.getPublicThreadPoolSize(),
+                DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+            if (!myCfg.isClientMode())
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
+
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
             sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1456,30 +1467,8 @@ public class IgnitionEx {
                 DFLT_SYSTEM_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
-            boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
-
-            if (isClientMode) {
-                execSvc = new IgniteThreadPoolExecutor(
-                    "pub-" + cfg.getGridName(),
-                    0,
-                    cfg.getPublicThreadPoolSize(),
-                    2000,
-                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-            }
-            else {
-                execSvc = new IgniteThreadPoolExecutor(
-                    "pub-" + cfg.getGridName(),
-                    cfg.getPublicThreadPoolSize(),
-                    cfg.getPublicThreadPoolSize(),
-                    DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
-                // Pre-start all threads as they are guaranteed to be needed.
-                ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
-                // Pre-start all threads as they are guaranteed to be needed.
-                ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
-            }
+            // Pre-start all threads as they are guaranteed to be needed.
+            ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -2007,7 +1996,6 @@ public class IgnitionEx {
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setCacheMode(cfg.getCacheMode());
             ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
-            ccfg.setNearConfiguration(new NearCacheConfiguration());
 
             if (cfg.getCacheMode() == PARTITIONED)
                 ccfg.setBackups(cfg.getBackups());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1eb7143..bea4256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -483,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.discovery().tryFailNode(nodeId);
                     }
 
+                    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
+                    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 2e80b6f..ce2a36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     private final GridMessageListener lsnr = new CheckpointRequestListener();
 
     /** */
-    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap;
 
     /** */
-    private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>(
-        MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q);
+    private final Collection<IgniteUuid> closedSess;
 
     /** Grid marshaller. */
     private final Marshaller marsh;
@@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
         super(ctx, ctx.config().getCheckpointSpi());
 
         marsh = ctx.config().getMarshaller();
+
+        if (enabled()) {
+            keyMap = new ConcurrentHashMap8<>();
+
+            closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS,
+                MAX_CLOSED_SESS,
+                0.75f,
+                256,
+                PER_SEGMENT_Q);
+        }
+        else {
+            keyMap = null;
+
+            closedSess = null;
+        }
     }
 
     /** {@inheritDoc} */
@@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Session IDs.
      */
     public Collection<IgniteUuid> sessionIds() {
-        return new ArrayList<>(keyMap.keySet());
+        return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList();
     }
 
     /**
@@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope,
-        long timeout, boolean override) throws IgniteCheckedException {
+    public boolean storeCheckpoint(GridTaskSessionInternal ses,
+        String key,
+        Object state,
+        ComputeTaskSessionScope scope,
+        long timeout,
+        boolean override)
+        throws IgniteCheckedException
+    {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(String key) {
+        if (!enabled())
+            return false;
+
         assert key != null;
 
         boolean rmv = false;
@@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException {
+        if (!enabled())
+            return null;
+
         assert ses != null;
         assert key != null;
 
@@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @param cleanup Whether cleanup or not.
      */
     public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) {
+        if (!enabled())
+            return;
+
         closedSess.add(ses.getId());
 
         // If on task node.
@@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     @Override public void printMemoryStats() {
         X.println(">>>");
         X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']');
-        X.println(">>>  keyMap: " + keyMap.size());
+        X.println(">>>  keyMap: " + (keyMap != null ? keyMap.size() : 0));
     }
 
     /**
@@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
             if (log.isDebugEnabled())
                 log.debug("Received checkpoint request: " + req);
 
+            if (!enabled())
+                return;
+
             IgniteUuid sesId = req.getSessionId();
 
             if (closedSess.contains(sesId)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9d98ce7..4216895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -395,6 +395,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        assert !CU.isUtilityCache(ctx.name());
+        assert !CU.isAtomicsCache(ctx.name());
+        assert !CU.isMarshallerCache(ctx.name());
+
         CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5f9049a..9bd6321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
+        boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
+            !cctx.config().isEagerTtl() ||
+            CU.isAtomicsCache(cctx.name()) ||
+            CU.isMarshallerCache(cctx.name()) ||
+            CU.isUtilityCache(cctx.name()) ||
+            (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
+
+        if (cleanupDisabled)
             return;
 
         cleanupWorker = new CleanupWorker();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4666cca..b6c77f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 collectPendingVersions(dhtTxLoc);
             }
 
-            // 3.1 Call dataStructures manager.
-            cctx.kernalContext().dataStructures().onTxCommitted(tx);
-
             // 4. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 72911af..27f6a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -32,6 +32,7 @@ import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.cache.event.*;
 import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
@@ -40,7 +41,6 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -99,6 +99,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** */
     private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
 
+    /** */
+    private UUID qryId;
+
     /**
      * @param ctx Context.
      */
@@ -112,7 +115,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() {
+    @Override public void onKernalStart() throws IgniteCheckedException {
         if (ctx.config().isDaemon())
             return;
 
@@ -139,10 +142,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             seqView = atomicsCache;
 
-            dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+            dsCacheCtx = atomicsCache.context();
+
+            qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+                null,
+                dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                false);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (qryId != null)
+            dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
+    }
+
     /**
      * Gets a sequence from cache or creates one if it's not cached.
      *
@@ -906,8 +922,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 dsCacheCtx.gate().enter();
 
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
-                        GridCacheCountDownLatchValue.class);
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
                     // Check that count down hasn't been created in other thread yet.
                     GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
@@ -1034,28 +1049,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Transaction committed callback for transaction manager.
      *
-     * @param tx Committed transaction.
      */
-    public <K, V> void onTxCommitted(IgniteInternalTx tx) {
-        if (dsCacheCtx == null)
-            return;
-
-        if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
-            Collection<IgniteTxEntry> entries = tx.writeEntries();
-
-            if (log.isDebugEnabled())
-                log.debug("Committed entries: " + entries);
-
-            for (IgniteTxEntry entry : entries) {
-                // Check updated or created GridCacheInternalKey keys.
-                if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
-
-                    Object val0 = CU.value(entry.value(), entry.context(), false);
+    private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
+                if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
+                    GridCacheInternal val0 = evt.getValue();
 
                     if (val0 instanceof GridCacheCountDownLatchValue) {
+                        GridCacheInternalKey key = evt.getKey();
+
                         // Notify latch on changes.
                         GridCacheRemovable latch = dsMap.get(key);
 
@@ -1067,8 +1073,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                             latch0.onUpdate(val.get());
 
                             if (val.get() == 0 && val.autoDelete()) {
-                                entry.cached().markObsolete(dsCacheCtx.versions().next());
-
                                 dsMap.remove(key);
 
                                 latch.onRemoved();
@@ -1080,11 +1084,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+
                 }
+                else {
+                    assert evt.getEventType() == EventType.REMOVED : evt;
 
-                // Check deleted GridCacheInternal keys.
-                if (entry.op() == DELETE && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+                    GridCacheInternal key = evt.getKey();
 
                     // Entry's val is null if entry deleted.
                     GridCacheRemovable obj = dsMap.remove(key);
@@ -1094,6 +1099,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 }
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryListener.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
new file mode 100644
index 0000000..82267a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.timeout;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Wrapper for {@link IgniteSpiTimeoutObject}.
+ */
+public class GridSpiTimeoutObject implements GridTimeoutObject {
+    /** */
+    @GridToStringInclude
+    private final IgniteSpiTimeoutObject obj;
+
+    /**
+     * @param obj SPI object.
+     */
+    public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) {
+        this.obj = obj;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        obj.onTimeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return obj.id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return obj.endTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final String toString() {
+        return S.toString(GridSpiTimeoutObject.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index e9b7717..e4f370c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
@@ -111,8 +112,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
      * @return Cancelable to cancel task.
      */
     public CancelableTask schedule(Runnable task, long delay, long period) {
-        assert delay >= 0;
-        assert period > 0 || period == -1;
+        assert delay >= 0 : delay;
+        assert period > 0 || period == -1 : period;
 
         CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
 
@@ -203,7 +204,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
      */
     public class CancelableTask implements GridTimeoutObject, Closeable {
         /** */
-        private final IgniteUuid id = new IgniteUuid();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private long endTime;
@@ -215,12 +216,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         private volatile boolean cancel;
 
         /** */
+        @GridToStringInclude
         private final Runnable task;
 
         /**
+         * @param task Task to execute.
          * @param firstTime First time.
          * @param period Period.
-         * @param task Task to execute.
          */
         CancelableTask(Runnable task, long firstTime, long period) {
             this.task = task;
@@ -243,19 +245,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
             if (cancel)
                 return;
 
-            long startTime = U.currentTimeMillis();
-
             try {
                 task.run();
             }
             finally {
-                long executionTime = U.currentTimeMillis() - startTime;
-
-                if (executionTime > 10) {
-                    U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
-                        "quickly [executionTime=" + executionTime + ']');
-                }
-
                 if (!cancel && period > 0) {
                     endTime = U.currentTimeMillis() + period;
 
@@ -273,5 +266,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
                 removeTimeoutObject(this);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CancelableTask.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 31396fb..2f7fd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -38,58 +38,58 @@ public interface GridCommunicationClient {
      * @param handshakeC Handshake.
      * @throws IgniteCheckedException If handshake failed.
      */
-    void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
+    public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
 
     /**
      * @return {@code True} if client has been closed by this call,
      *      {@code false} if failed to close client (due to concurrent reservation or concurrent close).
      */
-    boolean close();
+    public boolean close();
 
     /**
      * Forces client close.
      */
-    void forceClose();
+    public void forceClose();
 
     /**
      * @return {@code True} if client is closed;
      */
-    boolean closed();
+    public boolean closed();
 
     /**
      * @return {@code True} if client was reserved, {@code false} otherwise.
      */
-    boolean reserve();
+    public boolean reserve();
 
     /**
      * Releases this client by decreasing reservations.
      */
-    void release();
+    public void release();
 
     /**
      * @return {@code True} if client was reserved.
      */
-    boolean reserved();
+    public boolean reserved();
 
     /**
      * Gets idle time of this client.
      *
      * @return Idle time of this client.
      */
-    long getIdleTime();
+    public long getIdleTime();
 
     /**
      * @param data Data to send.
      * @throws IgniteCheckedException If failed.
      */
-    void sendMessage(ByteBuffer data) throws IgniteCheckedException;
+    public void sendMessage(ByteBuffer data) throws IgniteCheckedException;
 
     /**
      * @param data Data to send.
      * @param len Length.
      * @throws IgniteCheckedException If failed.
      */
-    void sendMessage(byte[] data, int len) throws IgniteCheckedException;
+    public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
 
     /**
      * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
@@ -97,16 +97,16 @@ public interface GridCommunicationClient {
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if should try to resend message.
      */
-    boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
+    public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
      * @param timeout Timeout.
      * @throws IOException If failed.
      */
-    void flushIfNeeded(long timeout) throws IOException;
+    public void flushIfNeeded(long timeout) throws IOException;
 
     /**
      * @return {@code True} if send is asynchronous.
      */
-    boolean async();
+    public boolean async();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index d3c0587..c9c633f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -542,10 +543,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         return U.spiAttribute(this, attrName);
     }
 
+    /** {@inheritDoc} */
+    protected void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.addTimeoutObject(obj);
+    }
+
+    /** {@inheritDoc} */
+    protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.removeTimeoutObject(obj);
+    }
+
     /**
      * Temporarily SPI context.
      */
-    private static class GridDummySpiContext implements IgniteSpiContext {
+    private class GridDummySpiContext implements IgniteSpiContext {
         /** */
         private final ClusterNode locNode;
 
@@ -716,5 +727,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         @Override public boolean tryFailNode(UUID nodeId) {
             return false;
         }
+
+        /** {@inheritDoc} */
+        @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+            ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+            ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 55f46e5..f83326c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -310,4 +310,14 @@ public interface IgniteSpiContext {
      * @return If node was failed.
      */
     public boolean tryFailNode(UUID nodeId);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void addTimeoutObject(IgniteSpiTimeoutObject c);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void removeTimeoutObject(IgniteSpiTimeoutObject c);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
new file mode 100644
index 0000000..f7fbd27f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.lang.*;
+
+/**
+ *
+ */
+public interface IgniteSpiTimeoutObject {
+    /**
+     * @return Unique object ID.
+     */
+    public IgniteUuid id();
+
+    /**
+     * @return End time.
+     */
+    public long endTime();
+
+    /**
+     * Timeout callback.
+     */
+    public void onTimeout();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
index 460cff3..832d872 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
@@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException {
+    @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 19e54c8..b324ab2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -267,7 +267,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                                 log.debug("Session was closed but there are unacknowledged messages, " +
                                                     "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
 
-                                            recoveryWorker.addReconnectRequest(recoveryData);
+                                            commWorker.addReconnectRequest(recoveryData);
                                         }
                                     }
                                     else
@@ -647,17 +647,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
-    /** Idle client worker. */
-    private IdleClientWorker idleClientWorker;
-
     /** Flush client worker. */
     private ClientFlushWorker clientFlushWorker;
 
-    /** Socket timeout worker. */
-    private SocketTimeoutWorker sockTimeoutWorker;
-
-    /** Recovery worker. */
-    private RecoveryWorker recoveryWorker;
+    /** Recovery and idle clients handler. */
+    private CommunicationWorker commWorker;
 
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -1274,13 +1268,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         nioSrvr.start();
 
-        idleClientWorker = new IdleClientWorker();
-
-        idleClientWorker.start();
-
-        recoveryWorker = new RecoveryWorker();
+        commWorker = new CommunicationWorker();
 
-        recoveryWorker.start();
+        commWorker.start();
 
         if (connBufSize > 0) {
             clientFlushWorker = new ClientFlushWorker();
@@ -1288,10 +1278,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             clientFlushWorker.start();
         }
 
-        sockTimeoutWorker = new SocketTimeoutWorker();
-
-        sockTimeoutWorker.start();
-
         // Ack start.
         if (log.isDebugEnabled())
             log.debug(startInfo());
@@ -1445,15 +1431,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
         U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         // Force closing on stop (safety).
         for (GridCommunicationClient client : clients.values())
@@ -1461,7 +1443,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Clear resources.
         nioSrvr = null;
-        idleClientWorker = null;
+        commWorker = null;
 
         boundTcpPort = -1;
 
@@ -1899,7 +1881,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
@@ -2005,7 +1987,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Ignoring whatever happened after timeout - reporting only timeout event.
             if (!cancelled)
@@ -2041,15 +2023,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
         U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         for (GridCommunicationClient client : clients.values())
             client.forceClose();
@@ -2156,119 +2134,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private class IdleClientWorker extends IgniteSpiThread {
-        /**
-         *
-         */
-        IdleClientWorker() {
-            super(gridName, "nio-idle-client-collector", log);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                cleanupRecovery();
-
-                for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
-                    UUID nodeId = e.getKey();
-
-                    GridCommunicationClient client = e.getValue();
-
-                    ClusterNode node = getSpiContext().node(nodeId);
-
-                    if (node == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Forcing close of non-existent node connection: " + nodeId);
-
-                        client.forceClose();
-
-                        clients.remove(nodeId, client);
-
-                        continue;
-                    }
-
-                    GridNioRecoveryDescriptor recovery = null;
-
-                    if (client instanceof GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
-
-                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
-                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
-
-                            if (log.isDebugEnabled())
-                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
-                                    ", rcvCnt=" + msg.received() + ']');
-
-                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
-
-                            recovery.lastAcknowledged(msg.received());
-
-                            continue;
-                        }
-                    }
-
-                    long idleTime = client.getIdleTime();
-
-                    if (idleTime >= idleConnTimeout) {
-                        if (recovery != null &&
-                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                            !recovery.messagesFutures().isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
-                                    "will wait: " + nodeId);
-
-                            continue;
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Closing idle node connection: " + nodeId);
-
-                        if (client.close() || client.closed())
-                            clients.remove(nodeId, client);
-                    }
-                }
-
-                Thread.sleep(idleConnTimeout);
-            }
-        }
-
-        /**
-         *
-         */
-        private void cleanupRecovery() {
-            Set<ClientKey> left = null;
-
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
-                if (left != null && left.contains(e.getKey()))
-                    continue;
-
-                GridNioRecoveryDescriptor recoverySnd = e.getValue();
-
-                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
-                    if (left == null)
-                        left = new HashSet<>();
-
-                    left.add(e.getKey());
-                }
-            }
-
-            if (left != null) {
-                assert !left.isEmpty();
-
-                for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
-
-                    if (recoverySnd != null)
-                        recoverySnd.onNodeLeft();
-                }
-            }
-        }
-    }
-
-    /**
-     *
-     */
     private class ClientFlushWorker extends IgniteSpiThread {
         /**
          *
@@ -2319,157 +2184,164 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
-     * Handles sockets timeouts.
+     *
      */
-    private class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() {
-                @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) {
-                    long time1 = o1.endTime();
-                    long time2 = o2.endTime();
-
-                    long id1 = o1.id();
-                    long id2 = o2.id();
-
-                    return time1 < time2 ? -1 : time1 > time2 ? 1 :
-                        id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
+    private class CommunicationWorker extends IgniteSpiThread {
+        /** */
+        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
 
         /**
          *
          */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-comm-sock-timeout-worker", log);
+        private CommunicationWorker() {
+            super(gridName, "tcp-comm-worker", log);
         }
 
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Tcp communication worker has been started.");
 
-            timeoutObjs.add(timeoutObj);
+            while (!isInterrupted()) {
+                GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
 
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
+                if (recoveryDesc != null)
+                    processRecovery(recoveryDesc);
+                else
+                    processIdle();
             }
         }
 
         /**
-         * @param timeoutObj Timeout object to remove.
+         *
          */
-        public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
+        private void processIdle() {
+            cleanupRecovery();
 
-            timeoutObjs.remove(timeoutObj);
-        }
+            for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+                UUID nodeId = e.getKey();
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
+                GridCommunicationClient client = e.getValue();
 
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
+                ClusterNode node = getSpiContext().node(nodeId);
 
-                for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    HandshakeTimeoutObject timeoutObj = iter.next();
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing close of non-existent node connection: " + nodeId);
+
+                    client.forceClose();
+
+                    clients.remove(nodeId, client);
+
+                    continue;
+                }
 
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
+                GridNioRecoveryDescriptor recovery = null;
 
-                        timeoutObj.onTimeout();
+                if (client instanceof GridTcpNioCommunicationClient) {
+                    recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+
+                    if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                ", rcvCnt=" + msg.received() + ']');
+
+                        nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+
+                        recovery.lastAcknowledged(msg.received());
+
+                        continue;
                     }
-                    else
-                        break;
                 }
 
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        HandshakeTimeoutObject first = timeoutObjs.firstx();
+                long idleTime = client.getIdleTime();
 
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
+                if (idleTime >= idleConnTimeout) {
+                    if (recovery != null &&
+                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                        !recovery.messagesFutures().isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                "will wait: " + nodeId);
 
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
+                        continue;
                     }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Closing idle node connection: " + nodeId);
+
+                    if (client.close() || client.closed())
+                        clients.remove(nodeId, client);
                 }
             }
         }
-    }
-
-    /**
-     *
-     */
-    private class RecoveryWorker extends IgniteSpiThread {
-        /** */
-        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
 
         /**
          *
          */
-        private RecoveryWorker() {
-            super(gridName, "tcp-comm-recovery-worker", log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Recovery worker has been started.");
+        private void cleanupRecovery() {
+            Set<ClientKey> left = null;
 
-            while (!isInterrupted()) {
-                GridNioRecoveryDescriptor recoveryDesc = q.take();
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+                if (left != null && left.contains(e.getKey()))
+                    continue;
 
-                assert recoveryDesc != null;
+                GridNioRecoveryDescriptor recoverySnd = e.getValue();
 
-                ClusterNode node = recoveryDesc.node();
+                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                    if (left == null)
+                        left = new HashSet<>();
 
-                if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    continue;
+                    left.add(e.getKey());
+                }
+            }
 
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+            if (left != null) {
+                assert !left.isEmpty();
 
-                    GridCommunicationClient client = reserveClient(node);
+                for (ClientKey id : left) {
+                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
 
-                    client.release();
+                    if (recoverySnd != null)
+                        recoverySnd.onNodeLeft();
                 }
-                catch (IgniteCheckedException | IgniteException e) {
-                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, will retry " +
-                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+            }
+        }
 
-                        addReconnectRequest(recoveryDesc);
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, " +
-                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+        /**
+         * @param recoveryDesc Recovery descriptor.
+         */
+        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
+            ClusterNode node = recoveryDesc.node();
 
-                        onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                            e);
-                    }
+            if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+                GridCommunicationClient client = reserveClient(node);
+
+                client.release();
+            }
+            catch (IgniteCheckedException | IgniteException e) {
+                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, will retry " +
+                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                    addReconnectRequest(recoveryDesc);
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, " +
+                            "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                    onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+                        e);
                 }
             }
         }
@@ -2497,12 +2369,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private static class HandshakeTimeoutObject<T> {
-        /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
+    private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
         /** */
-        private final long id = idGen.incrementAndGet();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private final T obj;
@@ -2533,34 +2402,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return done.compareAndSet(false, true);
         }
 
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
             if (done.compareAndSet(false, true)) {
                 // Close socket - timeout occurred.
                 if (obj instanceof GridCommunicationClient)
                     ((GridCommunicationClient)obj).forceClose();
                 else
                     U.closeQuiet((AbstractInterruptibleChannel)obj);
-
-                return true;
             }
-
-            return false;
         }
 
-        /**
-         * @return End time.
-         */
-        long endTime() {
+        /** {@inheritDoc} */
+        @Override public long endTime() {
             return endTime;
         }
 
-        /**
-         * @return ID.
-         */
-        long id() {
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
             return id;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e672d64..5b66019 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -116,7 +116,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
         b.append("    Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
         b.append("    Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
-        b.append("    Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
 
         b.append(U.nl());
 
@@ -524,11 +523,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         U.interrupt(sockWriter);
         U.interrupt(msgWorker);
-        U.interrupt(spi.sockTimeoutWorker);
 
         U.join(sockWriter, log);
         U.join(msgWorker, log);
-        U.join(spi.sockTimeoutWorker, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 57c13d6..485f57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1412,7 +1412,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
             b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
             b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
-            b.append("    Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 56fb63f..8365716 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -255,9 +255,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Internal and external addresses of local node. */
     protected Collection<InetSocketAddress> locNodeAddrs;
 
-    /** Socket timeout worker. */
-    protected SocketTimeoutWorker sockTimeoutWorker;
-
     /** Start time of the very first grid node. */
     protected volatile long gridStartTime;
 
@@ -1118,7 +1115,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         IOException err = null;
 
@@ -1136,7 +1133,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1180,7 +1177,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         IOException err = null;
 
@@ -1198,7 +1195,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1222,7 +1219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         OutputStream out = sock.getOutputStream();
 
@@ -1240,7 +1237,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1599,9 +1596,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
                 mcastIpFinder.setLocalAddress(locAddr);
         }
 
-        sockTimeoutWorker = new SocketTimeoutWorker();
-        sockTimeoutWorker.start();
-
         impl.spiStart(gridName);
     }
 
@@ -1611,9 +1605,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             // Safety.
             ctxInitLatch.countDown();
 
-        U.interrupt(sockTimeoutWorker);
-        U.join(sockTimeoutWorker, log);
-
         if (ipFinder != null) {
             try {
                 ipFinder.close();
@@ -1754,117 +1745,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
-     * Handles sockets timeouts.
-     */
-    protected class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
-                @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
-                    int res = Long.compare(o1.endTime(), o2.endTime());
-
-                    if (res != 0)
-                        return res;
-
-                    return Long.compare(o1.id(), o2.id());
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
-
-        /**
-         *
-         */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-disco-sock-timeout-worker", log);
-
-            setPriority(threadPri);
-        }
-
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
-            timeoutObjs.add(timeoutObj);
-
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
-            }
-        }
-
-        /**
-         * @param timeoutObj Timeout object to remove.
-         */
-        public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
-
-            timeoutObjs.remove(timeoutObj);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
-
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
-
-                for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    SocketTimeoutObject timeoutObj = iter.next();
-
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
-
-                        if (timeoutObj.onTimeout()) {
-                            LT.warn(log, null, "Socket write has timed out (consider increasing " +
-                                "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
-
-                            stats.onSocketTimeout();
-                        }
-                    }
-                    else
-                        break;
-                }
-
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        SocketTimeoutObject first = timeoutObjs.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
      * Socket timeout object.
      */
-    private static class SocketTimeoutObject {
+    private class SocketTimeoutObject implements IgniteSpiTimeoutObject {
         /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
-        /** */
-        private final long id = idGen.incrementAndGet();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private final Socket sock;
@@ -1894,31 +1779,26 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             return done.compareAndSet(false, true);
         }
 
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
             if (done.compareAndSet(false, true)) {
                 // Close socket - timeout occurred.
                 U.closeQuiet(sock);
 
-                return true;
-            }
+                LT.warn(log, null, "Socket write has timed out (consider increasing " +
+                    "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
 
-            return false;
+                stats.onSocketTimeout();
+            }
         }
 
-        /**
-         * @return End time.
-         */
-        long endTime() {
+        /** {@inheritDoc} */
+        @Override public long endTime() {
             return endTime;
         }
 
-        /**
-         * @return ID.
-         */
-        long id() {
+        /** {@inheritDoc} */
+        @Override public  IgniteUuid id() {
             return id;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 0f2a898..80e6123 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -28,6 +28,7 @@ import org.jetbrains.annotations.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static java.util.concurrent.TimeUnit.*;
 
@@ -258,6 +259,107 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
         checkRemovedLatch(latch);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLatchMultinode1() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteCountDownLatch latch = grid(0).countDownLatch("l1", 10,
+            true,
+            true);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        final AtomicBoolean countedDown = new AtomicBoolean();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCountDownLatch latch = ignite.countDownLatch("l1", 10,
+                        true,
+                        false);
+
+                    assertNotNull(latch);
+
+                    boolean wait = latch.await(30_000);
+
+                    assertTrue(countedDown.get());
+
+                    assertEquals(0, latch.count());
+
+                    assertTrue(wait);
+
+                    return null;
+                }
+            }));
+        }
+
+        for (int i = 0; i < 10; i++) {
+            if (i == 9)
+                countedDown.set(true);
+
+            latch.countDown();
+        }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLatchMultinode2() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteCountDownLatch latch = grid(0).countDownLatch("l2", gridCount() * 3,
+            true,
+            true);
+
+        assertNotNull(latch);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCountDownLatch latch = ignite.countDownLatch("l2", 10,
+                        true,
+                        false);
+
+                    assertNotNull(latch);
+
+                    for (int i = 0; i < 3; i++) {
+                        cnt.incrementAndGet();
+
+                        latch.countDown();
+                    }
+
+                    boolean wait = latch.await(30_000);
+
+                    assertEquals(gridCount() * 3, cnt.get());
+
+                    assertEquals(0, latch.count());
+
+                    assertTrue(wait);
+
+                    return null;
+                }
+            }));
+        }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
new file mode 100644
index 0000000..602ac18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.expiry.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNearCacheExpiryTest extends IgniteCacheAbstractTest {
+    /** */
+    private static final int NODES = 3;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODES;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.equals(getTestGridName(NODES - 1)))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpirationOnClient() throws Exception {
+        Ignite ignite = grid(NODES - 1);
+
+        assertTrue(ignite.configuration().isClientMode());
+
+        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+        assertTrue(((IgniteCacheProxy)cache).context().isNear());
+
+        for (int i = 0 ; i < 100; i++)
+            cache.put(i, i);
+
+        CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 500));
+
+        IgniteCache<Object, Object> cacheWithExpiry = cache.withExpiryPolicy(plc);
+
+        for (int i = 100 ; i < 200; i++) {
+            cacheWithExpiry.put(i, i);
+
+            assertEquals(i, cacheWithExpiry.localPeek(i));
+        }
+
+        U.sleep(1000);
+
+        for (int i = 0 ; i < 100; i++)
+            assertEquals(i, cacheWithExpiry.localPeek(i));
+
+        for (int i = 100 ; i < 200; i++)
+            assertNull(cache.localPeek(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index c006f69..c78ec5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -50,6 +50,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheTtlCleanupSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheClientNearCacheExpiryTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 5867fb8..21f9424 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -501,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.