You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/24 08:57:14 UTC

[01/16] ignite git commit: IGNITE-5781 Visor throws ClassCastException if cache store implementation is other than CacheJdbcPojoStore

Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 5a663e67f -> 0e26a8fee


IGNITE-5781 Visor throws ClassCastException if cache store implementation is other than CacheJdbcPojoStore

Signed-off-by: Konstantin Boudnik <co...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02e25075
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02e25075
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02e25075

Branch: refs/heads/ignite-5578
Commit: 02e2507598598c09ecf3eaa45e5c9b7c4db74152
Parents: cde3da4
Author: Aleksandr_Meterko <al...@epam.com>
Authored: Wed Jul 19 16:55:15 2017 +0400
Committer: Konstantin Boudnik <ko...@epam.com>
Committed: Wed Jul 19 15:16:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02e25075/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java
old mode 100644
new mode 100755
index e50402c..a03096f
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheJdbcType.java
@@ -64,7 +64,7 @@ public class VisorCacheJdbcType extends VisorDataTransferObject {
     public static List<VisorCacheJdbcType> list(Factory factory) {
         List<VisorCacheJdbcType> res = new ArrayList<>();
 
-        if (factory != null || factory instanceof CacheJdbcPojoStoreFactory) {
+        if (factory instanceof CacheJdbcPojoStoreFactory) {
             CacheJdbcPojoStoreFactory jdbcFactory = (CacheJdbcPojoStoreFactory) factory;
 
             JdbcType[] jdbcTypes = jdbcFactory.getTypes();


[06/16] ignite git commit: Assembly procedure fix

Posted by sb...@apache.org.
Assembly procedure fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db43b0c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db43b0c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db43b0c4

Branch: refs/heads/ignite-5578
Commit: db43b0c40161f8756ab5cad0c1b4b404f0743d8f
Parents: bd7a08e
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jul 20 14:58:09 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jul 20 14:58:09 2017 +0300

----------------------------------------------------------------------
 DEVNOTES.txt | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/db43b0c4/DEVNOTES.txt
----------------------------------------------------------------------
diff --git a/DEVNOTES.txt b/DEVNOTES.txt
index 8a689b8..340153e 100644
--- a/DEVNOTES.txt
+++ b/DEVNOTES.txt
@@ -2,14 +2,14 @@ Ignite Fabric Maven Build Instructions
 ======================================
 1) Optional: build Apache Ignite.NET as described at modules/platforms/dotnet/DEVNOTES.txt.
 
-2) Compile and package:
+2) Compile and install:
 
-  mvn clean package -Pall-java,all-scala,licenses -DskipTests
+  mvn clean install -Pall-java,all-scala,licenses -DskipTests
 
   or if you have built Apache Ignite.NET on the first step use following command:
   (Note that 'doxygen' should be installed before running this command.)
 
-  mvn clean package -Pall-java,all-scala,licenses -DskipTests -DclientDocs
+  mvn clean install -Pall-java,all-scala,licenses -DskipTests -DclientDocs
 
 3) Javadoc generation (optional):
 
@@ -25,14 +25,14 @@ Ignite Fabric with LGPL Maven Build Instructions
 ======================================
 1) Optional: build Apache Ignite.NET as described at modules/platforms/dotnet/DEVNOTES.txt.
 
-2) Compile and package:
+2) Compile and install:
 
-  mvn clean package -Pall-java,all-scala,licenses -DskipTests
+  mvn clean install -Pall-java,all-scala,licenses -DskipTests
 
   or if you have built Apache Ignite.NET on the first step use following command:
   (Note that 'doxygen' should be installed before running this command.)
 
-  mvn clean package -Pall-java,all-scala,licenses -DskipTests -DclientDocs
+  mvn clean install -Pall-java,all-scala,licenses -DskipTests -DclientDocs
 
 3) Javadoc generation with LGPL (optional):
 
@@ -46,15 +46,15 @@ Look for apache-ignite-fabric-lgpl-<version>-bin.zip in ./target/bin directory.
 
 Ignite Hadoop Accelerator Maven Build Instructions
 ============================================
-1) Compile and package:
+1) Compile and install:
 
-    mvn clean package -Pall-java,all-scala,licenses -DskipTests
+    mvn clean install -Pall-java,all-scala,licenses -DskipTests
 
 Use 'hadoop.version' parameter to build Ignite against a specific Hadoop version.
 Use 'spark.version' parameter to build ignite-spark module for a specific Spark version. Version should be >= 2.0.0.
 For example:
 
-    mvn clean package -Pall-java,all-scala,licenses -DskipTests -Dhadoop.version=2.4.2 -Dspark.version=2.1.1
+    mvn clean install -Pall-java,all-scala,licenses -DskipTests -Dhadoop.version=2.4.2 -Dspark.version=2.1.1
 
 2) Assembly Hadoop Accelerator:
   mvn initialize -Prelease -Dignite.edition=hadoop


[04/16] ignite git commit: IGNITE-5775: Fix bug with delay for compute jobs. This closes #2319.

Posted by sb...@apache.org.
IGNITE-5775: Fix bug with delay for compute jobs. This closes #2319.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e285f9db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e285f9db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e285f9db

Branch: refs/heads/ignite-5578
Commit: e285f9dbebc6fd86e81e52828813be9b9d2633f2
Parents: 02e2507
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Thu Jul 20 13:24:25 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jul 20 13:24:25 2017 +0300

----------------------------------------------------------------------
 .../processors/job/GridJobProcessor.java        | 10 +--
 .../internal/IgniteComputeJobOneThreadTest.java | 75 ++++++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java  |  3 +
 3 files changed, 83 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 7d2073e..cc8d903 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1794,6 +1794,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         return;
                     }
 
+                    if (!activeJobs.remove(worker.getJobId(), worker))
+                        cancelledJobs.remove(worker.getJobId(), worker);
+
+                    heldJobs.remove(worker.getJobId());
+
                     try {
                         handleCollisions();
                     }
@@ -1801,11 +1806,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         rwLock.readUnlock();
                     }
                 }
-
-                if (!activeJobs.remove(worker.getJobId(), worker))
-                    cancelledJobs.remove(worker.getJobId(), worker);
-
-                heldJobs.remove(worker.getJobId());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
new file mode 100644
index 0000000..76f669e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeJobOneThreadTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test of absence of gaps between jobs in compute
+ */
+public class IgniteComputeJobOneThreadTest extends GridCommonAbstractTest {
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        FifoQueueCollisionSpi colSpi = new FifoQueueCollisionSpi();
+        colSpi.setParallelJobsNumber(1);
+
+        return super.getConfiguration(name)
+            .setMetricsUpdateFrequency(10000)
+            .setCollisionSpi(colSpi);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoTimeout() throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteFuture fut = null;
+
+        for (int i = 0; i < 10000; i++) {
+            fut =  ignite.compute().runAsync(new IgniteRunnable() {
+                @Override public void run() {
+
+                }
+            });
+        }
+
+        fut.get();
+
+        assertTrue(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e285f9db/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 3f3bc53..ac3de73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.GridTaskNameAnnotationSelfTest;
 import org.apache.ignite.internal.GridTaskResultCacheSelfTest;
 import org.apache.ignite.internal.GridTaskTimeoutSelfTest;
 import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest;
+import org.apache.ignite.internal.IgniteComputeJobOneThreadTest;
 import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest;
 import org.apache.ignite.internal.IgniteExecutorServiceTest;
 import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest;
@@ -163,6 +164,8 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(IgniteComputeCustomExecutorConfigurationSelfTest.class);
         suite.addTestSuite(IgniteComputeCustomExecutorSelfTest.class);
 
+        suite.addTestSuite(IgniteComputeJobOneThreadTest.class);
+
         return suite;
     }
 }


[08/16] ignite git commit: IGNITE-3950 Deadlock when exchange starts with pending explicit lock

Posted by sb...@apache.org.
IGNITE-3950 Deadlock when exchange starts with pending explicit lock


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48f29943
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48f29943
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48f29943

Branch: refs/heads/ignite-5578
Commit: 48f29943efa9cbfc1e2c4068f7e16373dec2b0b9
Parents: db43b0c
Author: Vitaliy Biryukov <Bi...@gmail.com>
Authored: Fri Jul 21 15:29:23 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jul 21 15:29:23 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java      | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/48f29943/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
index 6fd5dd3..11b0eea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
@@ -117,8 +117,6 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testExplicitLockManyKeysWithClient() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-3950");
-
         checkExplicitLock(4, true);
     }
 


[09/16] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0adaf6e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0adaf6e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0adaf6e4

Branch: refs/heads/ignite-5578
Commit: 0adaf6e4a04fa6405a5596e583e760871e1092dd
Parents: 48f2994 0d2b989
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Jul 21 15:31:28 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jul 21 15:31:28 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/ml/math/DistanceMeasure.java  |   2 +-
 .../ignite/ml/math/EuclideanDistance.java       |   3 +-
 .../math/decompositions/EigenDecomposition.java |   2 +-
 .../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
 .../ml/math/impls/matrix/AbstractMatrix.java    |   4 +-
 .../ignite/ml/math/impls/matrix/BlockEntry.java |  50 +++
 .../ml/math/impls/matrix/CacheMatrix.java       |   9 +-
 .../matrix/SparseBlockDistributedMatrix.java    | 208 +++++++++
 .../impls/matrix/SparseDistributedMatrix.java   |  26 +-
 .../storage/matrix/BaseBlockMatrixKey.java      |  41 ++
 .../impls/storage/matrix/BlockMatrixKey.java    | 144 ++++++
 .../storage/matrix/BlockMatrixStorage.java      | 435 +++++++++++++++++++
 .../vector/SparseLocalOnHeapVectorStorage.java  |   4 +-
 .../ignite/ml/math/statistics/Variance.java     |   1 +
 .../ignite/ml/math/statistics/package-info.java |  22 +
 .../org/apache/ignite/ml/math/util/MapUtil.java |   2 +-
 .../ignite/ml/math/util/package-info.java       |  22 +
 .../java/org/apache/ignite/ml/package-info.java |  22 +
 .../ml/math/MathImplDistributedTestSuite.java   |   2 +
 .../SparseDistributedBlockMatrixTest.java       | 379 ++++++++++++++++
 .../matrix/SparseDistributedMatrixTest.java     |  32 +-
 21 files changed, 1528 insertions(+), 80 deletions(-)
----------------------------------------------------------------------



[16/16] ignite git commit: 5578

Posted by sb...@apache.org.
5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e26a8fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e26a8fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e26a8fe

Branch: refs/heads/ignite-5578
Commit: 0e26a8feebed0c44eb3cbbc77325a3e17fb481d6
Parents: 530ca72
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:57:06 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:57:06 2017 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionsExchangeFuture.java | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e26a8fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5d40084..3674276 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1226,19 +1226,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         if (cctx.kernalContext().clientNode()) {
-            msg = newGridDhtPartitionsSingleMessage (exchangeId(),
+            msg = new GridDhtPartitionsSingleMessage(exchangeId(),
                 true,
                 null,
                 true);
         }
         else {
-            msg = cctx.exchange().createPartitionsSingleMessage(
-             exchangeId(), false, true);}
+            msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
+                false,
+                true);
 
             Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
-        if (partHistReserved0 != null)
-            msg.partitionHistoryCounters(partHistReserved0);}
+            if (partHistReserved0 != null)
+                msg.partitionHistoryCounters(partHistReserved0);
+        }
 
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);
@@ -1647,7 +1649,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         assert !node.isDaemon() : node;
         assert msg != null;
-        assert exchId.equals(msg.exchangeId()) : msg;assert !cctx.kernalContext().clientNode();
+        assert exchId.equals(msg.exchangeId()) : msg;
+        assert !cctx.kernalContext().clientNode();
 
         if (msg.restoreState()) {
             InitNewCoordinatorFuture newCrdFut0;
@@ -1665,7 +1668,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
 
         if (!msg.client()) {
-if (!msg.client()) {        assert msg.lastVersion() != null : msg;
+            assert msg.lastVersion() != null : msg;
 
             updateLastVersion(msg.lastVersion());
         }


[07/16] ignite git commit: IGNITE-5791 Block matrix introduction

Posted by sb...@apache.org.
IGNITE-5791 Block matrix introduction

This closes #2326


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d2b989d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d2b989d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d2b989d

Branch: refs/heads/ignite-5578
Commit: 0d2b989d2be62533a36061940497a734463b5f10
Parents: db43b0c
Author: Yury Babak <yb...@gridgain.com>
Authored: Fri Jul 21 15:28:21 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 15:28:21 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/ml/math/DistanceMeasure.java  |   2 +-
 .../ignite/ml/math/EuclideanDistance.java       |   3 +-
 .../math/decompositions/EigenDecomposition.java |   2 +-
 .../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
 .../ml/math/impls/matrix/AbstractMatrix.java    |   4 +-
 .../ignite/ml/math/impls/matrix/BlockEntry.java |  50 +++
 .../ml/math/impls/matrix/CacheMatrix.java       |   9 +-
 .../matrix/SparseBlockDistributedMatrix.java    | 208 +++++++++
 .../impls/matrix/SparseDistributedMatrix.java   |  26 +-
 .../storage/matrix/BaseBlockMatrixKey.java      |  41 ++
 .../impls/storage/matrix/BlockMatrixKey.java    | 144 ++++++
 .../storage/matrix/BlockMatrixStorage.java      | 435 +++++++++++++++++++
 .../vector/SparseLocalOnHeapVectorStorage.java  |   4 +-
 .../ignite/ml/math/statistics/Variance.java     |   1 +
 .../ignite/ml/math/statistics/package-info.java |  22 +
 .../org/apache/ignite/ml/math/util/MapUtil.java |   2 +-
 .../ignite/ml/math/util/package-info.java       |  22 +
 .../java/org/apache/ignite/ml/package-info.java |  22 +
 .../ml/math/MathImplDistributedTestSuite.java   |   2 +
 .../SparseDistributedBlockMatrixTest.java       | 379 ++++++++++++++++
 .../matrix/SparseDistributedMatrixTest.java     |  32 +-
 21 files changed, 1528 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
index 09be0c3..df235a7 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/DistanceMeasure.java
@@ -34,5 +34,5 @@ public interface DistanceMeasure extends Externalizable {
      * @return the distance between the two vectors
      * @throws CardinalityException if the array lengths differ.
      */
-    double compute(Vector a, Vector b) throws CardinalityException;
+    public double compute(Vector a, Vector b) throws CardinalityException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
index 5f962ce..edc11dc 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/EuclideanDistance.java
@@ -30,8 +30,7 @@ public class EuclideanDistance implements DistanceMeasure {
     private static final long serialVersionUID = 1717556319784040040L;
 
     /** {@inheritDoc} */
-    @Override
-    public double compute(Vector a, Vector b)
+    @Override public double compute(Vector a, Vector b)
         throws CardinalityException {
         return MatrixUtil.localCopyOf(a).minus(b).kNorm(2.0);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
index d0e91a5..a5c92e6 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/decompositions/EigenDecomposition.java
@@ -446,7 +446,7 @@ public class EigenDecomposition implements Destroyable {
 
         // Store roots isolated by balanc and compute matrix norm
 
-        double norm = h.foldMap(Functions.PLUS, Functions.ABS, 0.0);
+        double norm = h.foldMap(Functions.PLUS, Functions.ABS, 0.0d);
 
         // Outer loop over eigenvalue index
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
index 1bda5e6..369840b 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
@@ -39,11 +39,16 @@ import org.apache.ignite.ml.math.KeyMapper;
 import org.apache.ignite.ml.math.ValueMapper;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.math.functions.IgniteConsumer;
+import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
 import org.apache.ignite.ml.math.functions.IgniteFunction;
-import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.internal.util.typedef.internal.A;
 
 /**
  * Distribution-related misc. support.
+ *
+ * TODO: IGNITE-5102, fix sparse key filters
  */
 public class CacheUtils {
     /**
@@ -127,19 +132,38 @@ public class CacheUtils {
      * @param matrixUuid Matrix UUID.
      * @return Sum obtained using sparse logic.
      */
-    public static <K, V> double sparseSum(IgniteUuid matrixUuid) {
-        Collection<Double> subSums = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
-            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
-            if (entry.getKey().get2().equals(matrixUuid)) {
-                Map<Integer, Double> map = entry.getValue();
+    @SuppressWarnings("unchecked")
+    public static <K, V> double sparseSum(IgniteUuid matrixUuid, String cacheName) {
+        A.notNull(matrixUuid, "matrixUuid");
+        A.notNull(cacheName, "cacheName");
+
+        Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+            V v = ce.entry().getValue();
+
+            double sum = 0.0;
 
-                double sum = sum(map.values());
+            if (v instanceof Map) {
+                Map<Integer, Double> map = (Map<Integer, Double>)v;
 
-                return acc == null ? sum : acc + sum;
+                sum = sum(map.values());
+            }
+            else if (v instanceof BlockEntry) {
+                BlockEntry be = (BlockEntry)v;
+
+                sum = be.sum();
             }
             else
-                return acc;
-        }, key -> key.get2().equals(matrixUuid));
+                throw new UnsupportedOperationException();
+
+            return acc == null ? sum : acc + sum;
+        }, key -> {
+            if (key instanceof BlockMatrixKey)
+                return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+            else if (key instanceof IgniteBiTuple)
+                return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+            else
+                throw new UnsupportedOperationException();
+        });
 
         return sum(subSums);
     }
@@ -186,23 +210,42 @@ public class CacheUtils {
      * @param matrixUuid Matrix UUID.
      * @return Minimum value obtained using sparse logic.
      */
-    public static <K, V> double sparseMin(IgniteUuid matrixUuid) {
-        Collection<Double> mins = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
-            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
+    @SuppressWarnings("unchecked")
+    public static <K, V> double sparseMin(IgniteUuid matrixUuid, String cacheName) {
+        A.notNull(matrixUuid, "matrixUuid");
+        A.notNull(cacheName, "cacheName");
 
-            if (entry.getKey().get2().equals(matrixUuid)) {
-                Map<Integer, Double> map = entry.getValue();
+        Collection<Double> mins = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+            V v = ce.entry().getValue();
 
-                double min = Collections.min(map.values());
+            double min;
 
-                if (acc == null)
-                    return min;
-                else
-                    return Math.min(acc, min);
+            if (v instanceof Map) {
+                Map<Integer, Double> map = (Map<Integer, Double>)v;
+
+                min = Collections.min(map.values());
+            }
+            else if (v instanceof BlockEntry) {
+                BlockEntry be = (BlockEntry)v;
+
+                min = be.minValue();
             }
             else
-                return acc;
-        }, key -> key.get2().equals(matrixUuid));
+                throw new UnsupportedOperationException();
+
+            if (acc == null)
+                return min;
+            else
+                return Math.min(acc, min);
+
+        }, key -> {
+            if (key instanceof BlockMatrixKey)
+                return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+            else if (key instanceof IgniteBiTuple)
+                return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+            else
+                throw new UnsupportedOperationException();
+        });
 
         return Collections.min(mins);
     }
@@ -211,22 +254,42 @@ public class CacheUtils {
      * @param matrixUuid Matrix UUID.
      * @return Maximum value obtained using sparse logic.
      */
-    public static <K, V> double sparseMax(IgniteUuid matrixUuid) {
-        Collection<Double> maxes = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> {
-            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry();
-            if (entry.getKey().get2().equals(matrixUuid)) {
-                Map<Integer, Double> map = entry.getValue();
+    @SuppressWarnings("unchecked")
+    public static <K, V> double sparseMax(IgniteUuid matrixUuid, String cacheName) {
+        A.notNull(matrixUuid, "matrixUuid");
+        A.notNull(cacheName, "cacheName");
+
+        Collection<Double> maxes = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> {
+            V v = ce.entry().getValue();
 
-                double max = Collections.max(map.values());
+            double max;
 
-                if (acc == null)
-                    return max;
-                else
-                    return Math.max(acc, max);
+            if (v instanceof Map) {
+                Map<Integer, Double> map = (Map<Integer, Double>)v;
+
+                max = Collections.max(map.values());
+            }
+            else if (v instanceof BlockEntry) {
+                BlockEntry be = (BlockEntry)v;
+
+                max = be.maxValue();
             }
             else
-                return acc;
-        }, key -> key.get2().equals(matrixUuid));
+                throw new UnsupportedOperationException();
+
+            if (acc == null)
+                return max;
+            else
+                return Math.max(acc, max);
+
+        }, key -> {
+            if (key instanceof BlockMatrixKey)
+                return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+            else if (key instanceof IgniteBiTuple)
+                return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+            else
+                throw new UnsupportedOperationException();
+        });
 
         return Collections.max(maxes);
     }
@@ -279,17 +342,41 @@ public class CacheUtils {
      * @param matrixUuid Matrix UUID.
      * @param mapper Mapping {@link IgniteFunction}.
      */
-    public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteFunction<Double, Double> mapper) {
-        foreach(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce) -> {
-            IgniteBiTuple k = ce.entry().getKey();
+    @SuppressWarnings("unchecked")
+    public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteDoubleFunction<Double> mapper, String cacheName) {
+        A.notNull(matrixUuid, "matrixUuid");
+        A.notNull(cacheName, "cacheName");
+        A.notNull(mapper, "mapper");
+
+        foreach(cacheName, (CacheEntry<K, V> ce) -> {
+            K k = ce.entry().getKey();
+
+            V v = ce.entry().getValue();
 
-            Map<Integer, Double> v = ce.entry().getValue();
+            if (v instanceof Map) {
+                Map<Integer, Double> map = (Map<Integer, Double>)v;
 
-            for (Map.Entry<Integer, Double> e : v.entrySet())
-                e.setValue(mapper.apply(e.getValue()));
+                for (Map.Entry<Integer, Double> e : (map.entrySet()))
+                    e.setValue(mapper.apply(e.getValue()));
+
+            }
+            else if (v instanceof BlockEntry) {
+                BlockEntry be = (BlockEntry)v;
+
+                be.map(mapper);
+            }
+            else
+                throw new UnsupportedOperationException();
 
             ce.cache().put(k, v);
-        }, key -> key.get2().equals(matrixUuid));
+        }, key -> {
+            if (key instanceof BlockMatrixKey)
+                return ((BlockMatrixKey)key).matrixId().equals(matrixUuid);
+            else if (key instanceof IgniteBiTuple)
+                return ((IgniteBiTuple<Integer, IgniteUuid>)key).get2().equals(matrixUuid);
+            else
+                throw new UnsupportedOperationException();
+        });
     }
 
     /**
@@ -327,8 +414,7 @@ public class CacheUtils {
 
                 // Iterate over given partition.
                 // Query returns an empty cursor if this partition is not stored on this node.
-                for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
-                    (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
+                for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
                     fun.accept(new CacheEntry<>(entry, cache));
             }
         });
@@ -387,12 +473,34 @@ public class CacheUtils {
         });
     }
 
+    /**
+     * Distributed version of fold operation.
+     *
+     * @param cacheName Cache name.
+     * @param folder Folder.
+     * @param keyFilter Key filter.
+     * @param accumulator Accumulator.
+     * @param zeroVal Zero value.
+     */
     public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
         IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal) {
         return sparseFold(cacheName, folder, keyFilter, accumulator, zeroVal, null, null, 0,
             false);
     }
 
+    /**
+     * Sparse version of fold. This method also applicable to sparse zeroes.
+     *
+     * @param cacheName Cache name.
+     * @param folder Folder.
+     * @param keyFilter Key filter.
+     * @param accumulator Accumulator.
+     * @param zeroVal Zero value.
+     * @param defVal Def value.
+     * @param defKey Def key.
+     * @param defValCnt Def value count.
+     * @param isNilpotent Is nilpotent.
+     */
     private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder,
         IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal, V defVal, K defKey, long defValCnt,
         boolean isNilpotent) {
@@ -411,7 +519,7 @@ public class CacheUtils {
 
             // Use affinity in filter for ScanQuery. Otherwise we accept consumer in each node which is wrong.
             Affinity affinity = ignite.affinity(cacheName);
-            ClusterNode localNode = ignite.cluster().localNode();
+            ClusterNode locNode = ignite.cluster().localNode();
 
             A a = zeroVal;
 
@@ -422,7 +530,7 @@ public class CacheUtils {
                 // Iterate over given partition.
                 // Query returns an empty cursor if this partition is not stored on this node.
                 for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
-                    (k, v) -> affinity.mapPartitionToNode(p) == localNode && (keyFilter == null || keyFilter.apply(k)))))
+                    (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k)))))
                     a = folder.apply(entry, a);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index d1d3904..3dc9b43 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -503,7 +503,7 @@ public abstract class AbstractMatrix implements Matrix {
 
     /** {@inheritDoc} */
     @Override public double determinant() {
-        //TODO: This decomposition should be cached
+        //TODO: IGNITE-5799, This decomposition should be cached
         LUDecomposition dec = new LUDecomposition(this);
         double res = dec.determinant();
         dec.destroy();
@@ -515,7 +515,7 @@ public abstract class AbstractMatrix implements Matrix {
         if (rowSize() != columnSize())
             throw new CardinalityException(rowSize(), columnSize());
 
-        //TODO: This decomposition should be cached
+        //TODO: IGNITE-5799, This decomposition should be cached
         LUDecomposition dec = new LUDecomposition(this);
 
         Matrix res = dec.solve(likeIdentity());

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
new file mode 100644
index 0000000..47f07ce
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import org.apache.ignite.ml.math.Matrix;
+
+/**
+ * Block for {@link SparseBlockDistributedMatrix}.
+ */
+public final class BlockEntry extends SparseLocalOnHeapMatrix {
+    /** Max block size. */
+    public static final int MAX_BLOCK_SIZE = 32;
+
+    /** */
+    public BlockEntry() {
+        // No-op.
+    }
+
+    /** */
+    public BlockEntry(int row, int col) {
+        super(row, col);
+
+        assert col <= MAX_BLOCK_SIZE;
+        assert row <= MAX_BLOCK_SIZE;
+    }
+
+    /** */
+    public BlockEntry(Matrix mtx) {
+        assert mtx.columnSize() <= MAX_BLOCK_SIZE;
+        assert mtx.rowSize() <= MAX_BLOCK_SIZE;
+
+        setStorage(mtx.getStorage());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
index a7f0afc..7f00bcb 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/CacheMatrix.java
@@ -65,7 +65,6 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
 
     /**
      *
-     *
      */
     @SuppressWarnings({"unchecked"})
     private CacheMatrixStorage<K, V> storage() {
@@ -93,7 +92,7 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
      * @param d Value to divide to.
      */
     @Override public Matrix divide(double d) {
-        return mapOverValues((Double v) -> v / d);
+        return mapOverValues(v -> v / d);
     }
 
     /**
@@ -102,7 +101,7 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
      * @param x Value to add.
      */
     @Override public Matrix plus(double x) {
-        return mapOverValues((Double v) -> v + x);
+        return mapOverValues(v -> v + x);
     }
 
     /**
@@ -111,12 +110,12 @@ public class CacheMatrix<K, V> extends AbstractMatrix {
      * @param x Value to multiply to.
      */
     @Override public Matrix times(double x) {
-        return mapOverValues((Double v) -> v * x);
+        return mapOverValues(v -> v * x);
     }
 
     /** {@inheritDoc} */
     @Override public Matrix assign(double val) {
-        return mapOverValues((Double v) -> val);
+        return mapOverValues(v -> val);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
new file mode 100644
index 0000000..b3481f9
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.exceptions.CardinalityException;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
+import org.apache.ignite.ml.math.impls.CacheUtils;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage;
+
+/**
+ * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link BlockEntry}.
+ *
+ * Using separate cache with keys {@link BlockMatrixKey} and values {@link BlockEntry}.
+ */
+public class SparseBlockDistributedMatrix extends AbstractMatrix implements StorageConstants {
+    /**
+     *
+     */
+    public SparseBlockDistributedMatrix() {
+        // No-op.
+    }
+
+    /**
+     * @param rows Amount of rows in the matrix.
+     * @param cols Amount of columns in the matrix.
+     */
+    public SparseBlockDistributedMatrix(int rows, int cols) {
+        assert rows > 0;
+        assert cols > 0;
+
+        setStorage(new BlockMatrixStorage(rows, cols));
+    }
+
+    /**
+     * Return the same matrix with updates values (broken contract).
+     *
+     * @param d Value to divide to.
+     */
+    @Override public Matrix divide(double d) {
+        return mapOverValues(v -> v / d);
+    }
+
+    /**
+     * Return the same matrix with updates values (broken contract).
+     *
+     * @param x Value to add.
+     */
+    @Override public Matrix plus(double x) {
+        return mapOverValues(v -> v + x);
+    }
+
+    /**
+     * Return the same matrix with updates values (broken contract).
+     *
+     * @param x Value to multiply.
+     */
+    @Override public Matrix times(double x) {
+        return mapOverValues(v -> v * x);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @SuppressWarnings({"unchecked"})
+    @Override public Matrix times(final Matrix mtx) {
+        if (mtx == null)
+            throw new IllegalArgumentException("The matrix should be not null.");
+
+        if (columnSize() != mtx.rowSize())
+            throw new CardinalityException(columnSize(), mtx.rowSize());
+
+        SparseBlockDistributedMatrix matrixA = this;
+        SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix)mtx;
+
+        String cacheName = BlockMatrixStorage.ML_BLOCK_CACHE_NAME;
+        SparseBlockDistributedMatrix matrixC = new SparseBlockDistributedMatrix(matrixA.rowSize(), matrixB.columnSize());
+
+        CacheUtils.bcast(BlockMatrixStorage.ML_BLOCK_CACHE_NAME, () -> {
+            Ignite ignite = Ignition.localIgnite();
+            Affinity affinity = ignite.affinity(cacheName);
+
+            IgniteCache<BlockMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName);
+            ClusterNode locNode = ignite.cluster().localNode();
+
+            BlockMatrixStorage storageC = matrixC.storage();
+
+            Map<ClusterNode, Collection<BlockMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys());
+            Collection<BlockMatrixKey> locKeys = keysCToNodes.get(locNode);
+
+            if (locKeys == null)
+                return;
+
+            // compute Cij locally on each node
+            // TODO: IGNITE:5114, exec in parallel
+            locKeys.forEach(key -> {
+                long newBlockId = key.blockId();
+                BlockEntry blockC = null;
+
+                List<BlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId, storageC);
+                List<BlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId, storageC);
+
+                for (int i = 0; i < aRow.size(); i++) {
+                    BlockEntry blockA = aRow.get(i);
+                    BlockEntry blockB = bCol.get(i);
+
+                    BlockEntry tmpBlock = new BlockEntry(blockA.times(blockB));
+
+                    blockC = blockC == null ? tmpBlock : new BlockEntry(blockC.plus(tmpBlock));
+                }
+
+                cache.put(storageC.getCacheKey(newBlockId), blockC);
+            });
+        });
+
+        return matrixC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Matrix assign(double val) {
+        return mapOverValues(v -> val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Matrix map(IgniteDoubleFunction<Double> fun) {
+        return mapOverValues(fun);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double sum() {
+        return CacheUtils.sparseSum(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double maxValue() {
+        return CacheUtils.sparseMax(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double minValue() {
+        return CacheUtils.sparseMin(getUUID(), BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Matrix copy() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Matrix like(int rows, int cols) {
+        return new SparseBlockDistributedMatrix(rows, cols);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Vector likeVector(int crd) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** */
+    private IgniteUuid getUUID() {
+        return ((BlockMatrixStorage)getStorage()).getUUID();
+    }
+
+    /**
+     * @param mapper Mapping function.
+     * @return Matrix with mapped values.
+     */
+    private Matrix mapOverValues(IgniteDoubleFunction<Double> mapper) {
+        CacheUtils.sparseMap(getUUID(), mapper, BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+        return this;
+    }
+
+    /**
+     *
+     */
+    private BlockMatrixStorage storage() {
+        return (BlockMatrixStorage)getStorage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index df2ddc4..a86db95 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -23,7 +23,6 @@ import org.apache.ignite.ml.math.StorageConstants;
 import org.apache.ignite.ml.math.Vector;
 import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
 import org.apache.ignite.ml.math.functions.IgniteDoubleFunction;
-import org.apache.ignite.ml.math.functions.IgniteFunction;
 import org.apache.ignite.ml.math.impls.CacheUtils;
 import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
 
@@ -61,10 +60,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
         setStorage(new SparseDistributedMatrixStorage(rows, cols, stoMode, acsMode));
     }
 
-    /**
-     *
-     *
-     */
+    /** */
     private SparseDistributedMatrixStorage storage() {
         return (SparseDistributedMatrixStorage)getStorage();
     }
@@ -75,7 +71,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
      * @param d Value to divide to.
      */
     @Override public Matrix divide(double d) {
-        return mapOverValues((Double v) -> v / d);
+        return mapOverValues(v -> v / d);
     }
 
     /**
@@ -84,7 +80,7 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
      * @param x Value to add.
      */
     @Override public Matrix plus(double x) {
-        return mapOverValues((Double v) -> v + x);
+        return mapOverValues(v -> v + x);
     }
 
     /**
@@ -93,42 +89,42 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo
      * @param x Value to multiply.
      */
     @Override public Matrix times(double x) {
-        return mapOverValues((Double v) -> v * x);
+        return mapOverValues(v -> v * x);
     }
 
     /** {@inheritDoc} */
     @Override public Matrix assign(double val) {
-        return mapOverValues((Double v) -> val);
+        return mapOverValues(v -> val);
     }
 
     /** {@inheritDoc} */
     @Override public Matrix map(IgniteDoubleFunction<Double> fun) {
-        return mapOverValues(fun::apply);
+        return mapOverValues(fun);
     }
 
     /**
      * @param mapper Mapping function.
      * @return Matrix with mapped values.
      */
-    private Matrix mapOverValues(IgniteFunction<Double, Double> mapper) {
-        CacheUtils.sparseMap(getUUID(), mapper);
+    private Matrix mapOverValues(IgniteDoubleFunction<Double> mapper) {
+        CacheUtils.sparseMap(getUUID(), mapper, SparseDistributedMatrixStorage.ML_CACHE_NAME);
 
         return this;
     }
 
     /** {@inheritDoc} */
     @Override public double sum() {
-        return CacheUtils.sparseSum(getUUID());
+        return CacheUtils.sparseSum(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
     }
 
     /** {@inheritDoc} */
     @Override public double maxValue() {
-        return CacheUtils.sparseMax(getUUID());
+        return CacheUtils.sparseMax(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
     }
 
     /** {@inheritDoc} */
     @Override public double minValue() {
-        return CacheUtils.sparseMin(getUUID());
+        return CacheUtils.sparseMin(getUUID(), SparseDistributedMatrixStorage.ML_CACHE_NAME);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
new file mode 100644
index 0000000..74ddfe5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BaseBlockMatrixKey.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+
+/**
+ * Cache key for blocks in {@link SparseBlockDistributedMatrix}.
+ */
+public interface BaseBlockMatrixKey {
+    /**
+     * @return block id.
+     */
+    public long blockId();
+
+    /**
+     * @return matrix id.
+     */
+    public IgniteUuid matrixId();
+
+    /**
+     * @return key affinity key.
+     */
+    public IgniteUuid affinityKey();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
new file mode 100644
index 0000000..3749f44
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixKey.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Key implementation for {@link BlockEntry} using for {@link SparseBlockDistributedMatrix}.
+ */
+public class BlockMatrixKey implements BaseBlockMatrixKey, Externalizable, Binarylizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+    /** Block ID */
+    private long blockId;
+    /** Matrix ID */
+    private IgniteUuid matrixUuid;
+    /** Block affinity key. */
+    private IgniteUuid affinityKey;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public BlockMatrixKey() {
+        // No-op.
+    }
+
+    /**
+     * Construct matrix block key.
+     *
+     * @param blockId Block id.
+     * @param matrixUuid Matrix uuid.
+     * @param affinityKey Affinity key.
+     */
+    public BlockMatrixKey(long blockId, IgniteUuid matrixUuid, @Nullable IgniteUuid affinityKey) {
+        assert blockId >= 0;
+        assert matrixUuid != null;
+
+        this.blockId = blockId;
+        this.matrixUuid = matrixUuid;
+        this.affinityKey = affinityKey;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blockId() {
+        return blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid matrixId() {
+        return matrixUuid;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid affinityKey() {
+        return affinityKey;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, matrixUuid);
+        U.writeGridUuid(out, affinityKey);
+        out.writeLong(blockId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        matrixUuid = U.readGridUuid(in);
+        affinityKey = U.readGridUuid(in);
+        blockId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+        BinaryRawWriter out = writer.rawWriter();
+
+        BinaryUtils.writeIgniteUuid(out, matrixUuid);
+        BinaryUtils.writeIgniteUuid(out, affinityKey);
+        out.writeLong(blockId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+        BinaryRawReader in = reader.rawReader();
+
+        matrixUuid = BinaryUtils.readIgniteUuid(in);
+        affinityKey = BinaryUtils.readIgniteUuid(in);
+        blockId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return matrixUuid.hashCode() + (int)(blockId ^ (blockId >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj == this)
+            return true;
+
+        if (obj == null || obj.getClass() != getClass())
+            return false;
+
+        BlockMatrixKey that = (BlockMatrixKey)obj;
+
+        return blockId == that.blockId && matrixUuid.equals(that.matrixUuid) && F.eq(affinityKey, that.affinityKey);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(BlockMatrixKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
new file mode 100644
index 0000000..6640e5a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/BlockMatrixStorage.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.storage.matrix;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.MatrixStorage;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.impls.CacheUtils;
+import org.apache.ignite.ml.math.impls.matrix.BlockEntry;
+import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix;
+
+import static org.apache.ignite.ml.math.impls.matrix.BlockEntry.MAX_BLOCK_SIZE;
+
+/**
+ * Storage for {@link SparseBlockDistributedMatrix}.
+ */
+public class BlockMatrixStorage extends CacheUtils implements MatrixStorage, StorageConstants {
+    /** Cache name used for all instances of {@link BlockMatrixStorage}. */
+    public static final String ML_BLOCK_CACHE_NAME = "ML_BLOCK_SPARSE_MATRICES_CONTAINER";
+    /** */
+    private int blocksInCol;
+    /** */
+    private int blocksInRow;
+    /** Amount of rows in the matrix. */
+    private int rows;
+    /** Amount of columns in the matrix. */
+    private int cols;
+    /** Matrix uuid. */
+    private IgniteUuid uuid;
+    /** Block size about 8 KB of data. */
+    private int maxBlockEdge = MAX_BLOCK_SIZE;
+
+    /** Actual distributed storage. */
+    private IgniteCache<
+        BlockMatrixKey /* Matrix block number with uuid. */,
+        BlockEntry /* Block of matrix, local sparse matrix. */
+        > cache = null;
+
+    /**
+     *
+     */
+    public BlockMatrixStorage() {
+        // No-op.
+    }
+
+    /**
+     * @param rows Amount of rows in the matrix.
+     * @param cols Amount of columns in the matrix.
+     */
+    public BlockMatrixStorage(int rows, int cols) {
+        assert rows > 0;
+        assert cols > 0;
+
+        this.rows = rows;
+        this.cols = cols;
+
+        //cols % maxBlockEdge > 0 ? 1 : 0
+
+        this.blocksInRow = cols % maxBlockEdge == 0 ? cols / maxBlockEdge : cols / maxBlockEdge + 1;
+        this.blocksInCol = rows % maxBlockEdge == 0 ? rows / maxBlockEdge : rows / maxBlockEdge + 1;
+
+        cache = newCache();
+
+        uuid = IgniteUuid.randomUuid();
+    }
+
+    /**
+     *
+     */
+    public IgniteCache<BlockMatrixKey, BlockEntry> cache() {
+        return cache;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double get(int x, int y) {
+        return matrixGet(x, y);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void set(int x, int y, double v) {
+        matrixSet(x, y, v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int columnSize() {
+        return cols;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int rowSize() {
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(rows);
+        out.writeInt(cols);
+        out.writeInt(blocksInRow);
+        out.writeInt(blocksInCol);
+        U.writeGridUuid(out, uuid);
+        out.writeUTF(cache.getName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        rows = in.readInt();
+        cols = in.readInt();
+        blocksInRow = in.readInt();
+        blocksInCol = in.readInt();
+        uuid = U.readGridUuid(in);
+        cache = ignite().getOrCreateCache(in.readUTF());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isSequentialAccess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDense() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRandomAccess() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDistributed() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isArrayBased() {
+        return false;
+    }
+
+    /** Delete all data from cache. */
+    @Override public void destroy() {
+        long maxBlockId = getBlockId(cols, rows);
+
+        Set<BlockMatrixKey> keyset = LongStream.rangeClosed(0, maxBlockId).mapToObj(this::getCacheKey).collect(Collectors.toSet());
+
+        cache.clearAll(keyset);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = 1;
+
+        res = res * 37 + cols;
+        res = res * 37 + rows;
+        res = res * 37 + uuid.hashCode();
+        res = res * 37 + cache.hashCode();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+
+        if (obj == null || getClass() != obj.getClass())
+            return false;
+
+        BlockMatrixStorage that = (BlockMatrixStorage)obj;
+
+        return rows == that.rows && cols == that.cols && uuid.equals(that.uuid)
+            && (cache != null ? cache.equals(that.cache) : that.cache == null);
+    }
+
+    /**
+     * Get storage UUID.
+     *
+     * @return storage UUID.
+     */
+    public IgniteUuid getUUID() {
+        return uuid;
+    }
+
+    /**
+     * Build the cache key for the given block id
+     */
+    public BlockMatrixKey getCacheKey(long blockId) {
+        return new BlockMatrixKey(blockId, uuid, getAffinityKey(blockId));
+    }
+
+    /**
+     * Get rows for current block.
+     *
+     * @param blockId block id.
+     * @param storageC result storage.
+     * @return The list of block entries.
+     */
+    public List<BlockEntry> getRowForBlock(long blockId, BlockMatrixStorage storageC) {
+        long blockRow = blockId / storageC.blocksInCol;
+        long blockCol = blockId % storageC.blocksInRow;
+
+        long locBlock = this.blocksInRow * (blockRow) + (blockCol >= this.blocksInRow ? (blocksInRow - 1) : blockCol);
+
+        return getRowForBlock(locBlock);
+    }
+
+    /**
+     * Get cols for current block.
+     *
+     * @param blockId block id.
+     * @param storageC result storage.
+     * @return The list of block entries.
+     */
+    public List<BlockEntry> getColForBlock(long blockId, BlockMatrixStorage storageC) {
+        long blockRow = blockId / storageC.blocksInCol;
+        long blockCol = blockId % storageC.blocksInRow;
+
+        long locBlock = this.blocksInRow * (blockRow) + (blockCol >= this.blocksInRow ? (blocksInRow - 1) : blockCol);
+
+        return getColForBlock(locBlock);
+    }
+
+    /**
+     * Build a keyset for this matrix storage.
+     */
+    public Collection<BlockMatrixKey> getAllKeys() {
+        long maxBlockId = numberOfBlocks();
+        Collection<BlockMatrixKey> keys = new LinkedList<>();
+
+        for (long id = 0; id < maxBlockId; id++)
+            keys.add(getCacheKey(id));
+
+        return keys;
+    }
+
+    /** */
+    private List<BlockEntry> getRowForBlock(long blockId) {
+        List<BlockEntry> res = new LinkedList<>();
+
+        boolean isFirstRow = blockId < blocksInRow;
+
+        long startBlock = isFirstRow ? 0 : blockId - blockId % blocksInRow;
+        long endBlock = startBlock + blocksInRow - 1;
+
+        for (long i = startBlock; i <= endBlock; i++)
+            res.add(getEntryById(i));
+
+        return res;
+    }
+
+    /** */
+    private List<BlockEntry> getColForBlock(long blockId) {
+        List<BlockEntry> res = new LinkedList<>();
+
+        long startBlock = blockId % blocksInRow;
+        long endBlock = startBlock + blocksInRow * (blocksInCol - 1);
+
+        for (long i = startBlock; i <= endBlock; i += blocksInRow)
+            res.add(getEntryById(i));
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private BlockEntry getEntryById(long blockId) {
+        BlockMatrixKey key = getCacheKey(blockId);
+
+        BlockEntry entry = cache.localPeek(key);
+        entry = entry != null ? entry : cache.get(key);
+
+        if (entry == null) {
+            long colId = blockId == 0 ? 0 : blockId + 1;
+
+            boolean isLastRow = (blockId) >= blocksInRow * (blocksInCol - 1);
+            boolean isLastCol = (colId) % blocksInRow == 0;
+
+            entry = new BlockEntry(isLastRow && rows % maxBlockEdge != 0 ? rows % maxBlockEdge : maxBlockEdge, isLastCol && cols % maxBlockEdge != 0 ? cols % maxBlockEdge : maxBlockEdge);
+        }
+
+        return entry;
+    }
+
+    /**
+     *
+     */
+    private long numberOfBlocks() {
+        int rows = rowSize();
+        int cols = columnSize();
+
+        return ((rows / maxBlockEdge) + (((rows % maxBlockEdge) > 0) ? 1 : 0))
+            * ((cols / maxBlockEdge) + (((cols % maxBlockEdge) > 0) ? 1 : 0));
+    }
+
+    /**
+     * TODO: IGNITE-5646, WIP
+     *
+     * Get affinity key for the given id.
+     */
+    private IgniteUuid getAffinityKey(long id) {
+        return null;
+    }
+
+    /**
+     * Distributed matrix set.
+     *
+     * @param a Row or column index.
+     * @param b Row or column index.
+     * @param v New value to set.
+     */
+    private void matrixSet(int a, int b, double v) {
+        long id = getBlockId(a, b);
+        // Remote set on the primary node (where given row or column is stored locally).
+        ignite().compute(groupForKey(ML_BLOCK_CACHE_NAME, id)).run(() -> {
+            IgniteCache<BlockMatrixKey, BlockEntry> cache = Ignition.localIgnite().getOrCreateCache(ML_BLOCK_CACHE_NAME);
+
+            BlockMatrixKey key = getCacheKey(getBlockId(a, b));
+
+            // Local get.
+            BlockEntry block = cache.localPeek(key, CachePeekMode.PRIMARY);
+
+            if (block == null)
+                block = cache.get(key); //Remote entry get.
+
+            if (block == null)
+                block = initBlockFor(a, b);
+
+            block.set(a % block.rowSize(), b % block.columnSize(), v);
+
+            // Local put.
+            cache.put(key, block);
+        });
+    }
+
+    /** */
+    private long getBlockId(int x, int y) {
+        return (y / maxBlockEdge) * blockShift(cols) + (x / maxBlockEdge);
+    }
+
+    /** */
+    private BlockEntry initBlockFor(int x, int y) {
+        int blockRows = rows - x >= maxBlockEdge ? maxBlockEdge : rows - x;
+        int blockCols = cols - y >= maxBlockEdge ? maxBlockEdge : cols - y;
+
+        return new BlockEntry(blockRows, blockCols);
+    }
+
+    /** */
+    private int blockShift(int i) {
+        return (i) / maxBlockEdge + ((i) % maxBlockEdge > 0 ? 1 : 0);
+    }
+
+    /**
+     * Distributed matrix get.
+     *
+     * @param a Row or column index.
+     * @param b Row or column index.
+     * @return Matrix value at (a, b) index.
+     */
+    private double matrixGet(int a, int b) {
+        // Remote get from the primary node (where given row or column is stored locally).
+        return ignite().compute(groupForKey(ML_BLOCK_CACHE_NAME, getBlockId(a, b))).call(() -> {
+            IgniteCache<BlockMatrixKey, BlockEntry> cache = Ignition.localIgnite().getOrCreateCache(ML_BLOCK_CACHE_NAME);
+
+            BlockMatrixKey key = getCacheKey(getBlockId(a, b));
+
+            // Local get.
+            BlockEntry block = cache.localPeek(key, CachePeekMode.PRIMARY);
+
+            if (block == null)
+                block = cache.get(key);
+
+            return block == null ? 0.0 : block.get(a % block.rowSize(), b % block.columnSize());
+        });
+    }
+
+    /**
+     * Create new ML cache if needed.
+     */
+    private IgniteCache<BlockMatrixKey, BlockEntry> newCache() {
+        CacheConfiguration<BlockMatrixKey, BlockEntry> cfg = new CacheConfiguration<>();
+
+        // Write to primary.
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+
+        // Atomic transactions only.
+        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        // No eviction.
+        cfg.setEvictionPolicy(null);
+
+        // No copying of values.
+        cfg.setCopyOnRead(false);
+
+        // Cache is partitioned.
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+
+        // Random cache name.
+        cfg.setName(ML_BLOCK_CACHE_NAME);
+
+        return Ignition.localIgnite().getOrCreateCache(cfg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
index f2efe74..5145376 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/vector/SparseLocalOnHeapVectorStorage.java
@@ -46,9 +46,7 @@ public class SparseLocalOnHeapVectorStorage implements VectorStorage, StorageCon
         // No-op.
     }
 
-    /**
-     * @param map
-     */
+    /** */
     public SparseLocalOnHeapVectorStorage(Map<Integer, Double> map, boolean copy) {
         assert map.size() > 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
index e406b5b..525e6e9 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/Variance.java
@@ -30,6 +30,7 @@ public class Variance {
     /** */
     private double m2;
 
+    /** */
     public Variance() {
         mean = 0;
         n = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
new file mode 100644
index 0000000..7b65fce
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/statistics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Statistics stuff.
+ */
+package org.apache.ignite.ml.math.statistics;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
index 6c25f0e..9190901 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/MapUtil.java
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- *
+ * Some {@link Map} related utils.
  */
 public class MapUtil {
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
new file mode 100644
index 0000000..2507ee4
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Some math utils.
+ */
+package org.apache.ignite.ml.math.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
new file mode 100644
index 0000000..779581b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Root ML package.
+ */
+package org.apache.ignite.ml;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
index 9899d3b..5dc860c 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplDistributedTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.ml.math;
 
 import org.apache.ignite.ml.math.impls.matrix.CacheMatrixTest;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedBlockMatrixTest;
 import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrixTest;
 import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorageTest;
 import org.apache.ignite.ml.math.impls.vector.CacheVectorTest;
@@ -33,6 +34,7 @@ import org.junit.runners.Suite;
     CacheMatrixTest.class,
     SparseDistributedMatrixStorageTest.class,
     SparseDistributedMatrixTest.class,
+    SparseDistributedBlockMatrixTest.class
 })
 public class MathImplDistributedTestSuite {
     // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
new file mode 100644
index 0000000..1228f05
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedBlockMatrixTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math.impls.matrix;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.math.impls.MathTestConstants;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixKey;
+import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+import static org.apache.ignite.ml.math.impls.MathTestConstants.UNEXPECTED_VAL;
+
+/**
+ * Tests for {@link SparseBlockDistributedMatrix}.
+ */
+@GridCommonTest(group = "Distributed Models")
+public class SparseDistributedBlockMatrixTest extends GridCommonAbstractTest {
+    /** Number of nodes in grid */
+    private static final int NODE_COUNT = 3;
+    /** Precision. */
+    private static final double PRECISION = 0.0;
+    /** Grid instance. */
+    private Ignite ignite;
+    /** Matrix rows */
+    private final int rows = MathTestConstants.STORAGE_SIZE;
+    /** Matrix cols */
+    private final int cols = MathTestConstants.STORAGE_SIZE;
+    /** Matrix for tests */
+    private SparseBlockDistributedMatrix cacheMatrix;
+
+    /**
+     * Default constructor.
+     */
+    public SparseDistributedBlockMatrixTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void beforeTest() throws Exception {
+        ignite = grid(NODE_COUNT);
+
+        ignite.configuration().setPeerClassLoadingEnabled(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (cacheMatrix != null) {
+            cacheMatrix.destroy();
+            cacheMatrix = null;
+        }
+    }
+
+    /** */
+    public void testGetSet() throws Exception {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        for (int i = 0; i < rows; i++) {
+            for (int j = 0; j < cols; j++) {
+                double v = Math.random();
+                cacheMatrix.set(i, j, v);
+
+                assertEquals("Unexpected value for matrix element["+ i +" " + j + "]", v, cacheMatrix.get(i, j), PRECISION);
+            }
+        }
+    }
+
+    /** */
+    public void testExternalize() throws IOException, ClassNotFoundException {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        cacheMatrix.set(1, 1, 1.0);
+
+        ByteArrayOutputStream byteArrOutputStream = new ByteArrayOutputStream();
+        ObjectOutputStream objOutputStream = new ObjectOutputStream(byteArrOutputStream);
+
+        objOutputStream.writeObject(cacheMatrix);
+
+        ByteArrayInputStream byteArrInputStream = new ByteArrayInputStream(byteArrOutputStream.toByteArray());
+        ObjectInputStream objInputStream = new ObjectInputStream(byteArrInputStream);
+
+        SparseBlockDistributedMatrix objRestored = (SparseBlockDistributedMatrix)objInputStream.readObject();
+
+        assertTrue(MathTestConstants.VAL_NOT_EQUALS, cacheMatrix.equals(objRestored));
+        assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, PRECISION);
+    }
+
+    /** Test simple math. */
+    public void testMath() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+        initMtx(cacheMatrix);
+
+        cacheMatrix.assign(2.0);
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                assertEquals(UNEXPECTED_VAL, 2.0, cacheMatrix.get(i, j), PRECISION);
+
+        cacheMatrix.plus(3.0);
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                assertEquals(UNEXPECTED_VAL, 5.0, cacheMatrix.get(i, j), PRECISION);
+
+        cacheMatrix.times(2.0);
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                assertEquals(UNEXPECTED_VAL, 10.0, cacheMatrix.get(i, j), PRECISION);
+
+        cacheMatrix.divide(10.0);
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                assertEquals(UNEXPECTED_VAL, 1.0, cacheMatrix.get(i, j), PRECISION);
+
+        assertEquals(UNEXPECTED_VAL, cacheMatrix.rowSize() * cacheMatrix.columnSize(), cacheMatrix.sum(), PRECISION);
+    }
+
+    /** */
+    public void testMinMax() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                cacheMatrix.set(i, j, i * cols + j + 1);
+
+        assertEquals(UNEXPECTED_VAL, 1.0, cacheMatrix.minValue(), PRECISION);
+        assertEquals(UNEXPECTED_VAL, rows * cols, cacheMatrix.maxValue(), PRECISION);
+
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                cacheMatrix.set(i, j, -1.0 * (i * cols + j + 1));
+
+        assertEquals(UNEXPECTED_VAL, -rows * cols, cacheMatrix.minValue(), PRECISION);
+        assertEquals(UNEXPECTED_VAL, -1.0, cacheMatrix.maxValue(), PRECISION);
+
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                cacheMatrix.set(i, j, i * cols + j);
+
+        assertEquals(UNEXPECTED_VAL, 0.0, cacheMatrix.minValue(), PRECISION);
+        assertEquals(UNEXPECTED_VAL, rows * cols - 1.0, cacheMatrix.maxValue(), PRECISION);
+    }
+
+    /** */
+    public void testMap() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+        initMtx(cacheMatrix);
+
+        cacheMatrix.map(i -> 100.0);
+        for (int i = 0; i < cacheMatrix.rowSize(); i++)
+            for (int j = 0; j < cacheMatrix.columnSize(); j++)
+                assertEquals(UNEXPECTED_VAL, 100.0, cacheMatrix.get(i, j), PRECISION);
+    }
+
+    /** */
+    public void testCopy() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        try {
+            cacheMatrix.copy();
+            fail("UnsupportedOperationException expected.");
+        }
+        catch (UnsupportedOperationException e) {
+            return;
+        }
+        fail("UnsupportedOperationException expected.");
+    }
+
+    /** */
+    public void testCacheBehaviour(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        SparseBlockDistributedMatrix cacheMatrix1 = new SparseBlockDistributedMatrix(rows, cols);
+        SparseBlockDistributedMatrix cacheMatrix2 = new SparseBlockDistributedMatrix(rows, cols);
+
+        initMtx(cacheMatrix1);
+        initMtx(cacheMatrix2);
+
+        Collection<String> cacheNames = ignite.cacheNames();
+
+        assert cacheNames.contains(BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+        IgniteCache<BlockMatrixKey, Object> cache = ignite.getOrCreateCache(BlockMatrixStorage.ML_BLOCK_CACHE_NAME);
+
+        Set<BlockMatrixKey> keySet1 = buildKeySet(cacheMatrix1);
+        Set<BlockMatrixKey> keySet2 = buildKeySet(cacheMatrix2);
+
+        assert cache.containsKeys(keySet1);
+        assert cache.containsKeys(keySet2);
+
+        cacheMatrix2.destroy();
+
+        assert cache.containsKeys(keySet1);
+        assert !cache.containsKeys(keySet2);
+
+        cacheMatrix1.destroy();
+
+        assert !cache.containsKeys(keySet1);
+    }
+
+    /** */
+    public void testLike() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        assertNotNull(cacheMatrix.like(1, 1));
+    }
+
+    /** */
+    public void testLikeVector() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        cacheMatrix = new SparseBlockDistributedMatrix(rows, cols);
+
+        try {
+            cacheMatrix.likeVector(1);
+            fail("UnsupportedOperationException expected.");
+        }
+        catch (UnsupportedOperationException e) {
+            return;
+        }
+        fail("UnsupportedOperationException expected.");
+    }
+
+    /**
+     * Simple test for two square matrices.
+     */
+    public void testSquareMatrixTimes(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        int size = 100;
+
+        Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size, size);
+        Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size, size);
+
+        for (int i = 0; i < size; i++) {
+            cacheMatrix1.setX(i, i, i);
+            cacheMatrix2.setX(i, i, i);
+        }
+
+        Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+        for(int i = 0; i < size; i++)
+            for(int j = 0; j < size; j++)
+                if (i == j)
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+                else
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+    }
+
+    /**
+     *
+     */
+    public void testNonSquareMatrixTimes(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        int size = BlockEntry.MAX_BLOCK_SIZE + 1;
+        int size2 = BlockEntry.MAX_BLOCK_SIZE * 2 + 1;
+
+        Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size2, size);
+        Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size, size2);
+
+        for (int i = 0; i < size; i++) {
+            cacheMatrix1.setX(i, i, i);
+            cacheMatrix2.setX(i, i, i);
+        }
+
+        Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+        for(int i = 0; i < size; i++)
+            for(int j = 0; j < size; j++)
+                if (i == j)
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+                else
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+    }
+
+    /**
+     *
+     */
+    public void testNonSquareMatrixTimes2(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        int size = BlockEntry.MAX_BLOCK_SIZE + 1;
+        int size2 = BlockEntry.MAX_BLOCK_SIZE * 2 + 1;
+
+        Matrix cacheMatrix1 = new SparseBlockDistributedMatrix(size, size2);
+        Matrix cacheMatrix2 = new SparseBlockDistributedMatrix(size2, size);
+
+        for (int i = 0; i < size; i++) {
+            cacheMatrix1.setX(i, i, i);
+            cacheMatrix2.setX(i, i, i);
+        }
+
+        Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+        for(int i = 0; i < size; i++)
+            for(int j = 0; j < size; j++)
+                if (i == j)
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, i * i, res.get(i, j), PRECISION);
+                else
+                    assertEquals(UNEXPECTED_VAL + " for "+ i +":"+ j, 0, res.get(i, j), PRECISION);
+    }
+
+    /** */
+    private void initMtx(Matrix m) {
+        for (int i = 0; i < m.rowSize(); i++)
+            for (int j = 0; j < m.columnSize(); j++)
+                m.set(i, j, 1.0);
+    }
+
+    /** Build key set for SparseBlockDistributedMatrix. */
+    private Set<BlockMatrixKey> buildKeySet(SparseBlockDistributedMatrix m){
+        Set<BlockMatrixKey> set = new HashSet<>();
+
+        BlockMatrixStorage storage = (BlockMatrixStorage)m.getStorage();
+
+        IgniteUuid uuid = storage.getUUID();
+
+        long maxBlock = (rows / 32 + (rows % 32 > 0 ? 1 : 0)) * (cols / 32 + (cols % 32 > 0 ? 1 : 0));
+
+        for (long i = 0; i < maxBlock; i++)
+            set.add(new BlockMatrixKey(i,uuid,null));
+
+        return set;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d2b989d/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
index a7cd6b5..3fec83c 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
@@ -48,10 +48,10 @@ import static org.apache.ignite.ml.math.impls.MathTestConstants.UNEXPECTED_VAL;
 public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
     /** Number of nodes in grid */
     private static final int NODE_COUNT = 3;
-    /** Cache name. */
-    private static final String CACHE_NAME = "test-cache";
     /** Precision. */
     private static final double PRECISION = 0.0;
+    /** */
+    private static final int MATRIX_SIZE = 10;
     /** Grid instance. */
     private Ignite ignite;
     /** Matrix rows */
@@ -90,8 +90,6 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        ignite.destroyCache(CACHE_NAME);
-
         if (cacheMatrix != null) {
             cacheMatrix.destroy();
             cacheMatrix = null;
@@ -166,7 +164,9 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
         assertEquals(UNEXPECTED_VAL, cacheMatrix.rowSize() * cacheMatrix.columnSize(), cacheMatrix.sum(), PRECISION);
     }
 
-    /** */
+    /**
+     * TODO: IGNITE-5102, wrong min/max, wait for fold/map fix
+     */
     public void testMinMax() {
         IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
 
@@ -286,6 +286,28 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest {
     }
 
     /** */
+    public void testMatrixTimes(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        SparseDistributedMatrix cacheMatrix1 = new SparseDistributedMatrix(MATRIX_SIZE, MATRIX_SIZE, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+        SparseDistributedMatrix cacheMatrix2 = new SparseDistributedMatrix(MATRIX_SIZE, MATRIX_SIZE, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
+
+        for (int i = 0; i < MATRIX_SIZE; i++) {
+            cacheMatrix1.setX(i, i, i);
+            cacheMatrix2.setX(i, i, i);
+        }
+
+        Matrix res = cacheMatrix1.times(cacheMatrix2);
+
+        for(int i = 0; i < MATRIX_SIZE; i++)
+            for(int j = 0; j < MATRIX_SIZE; j++)
+                if (i == j)
+                    assertEquals(UNEXPECTED_VAL, i * i, res.get(i, j), PRECISION);
+                else
+                    assertEquals(UNEXPECTED_VAL, 0, res.get(i, j), PRECISION);
+    }
+
+    /** */
     private void initMtx(Matrix m) {
         for (int i = 0; i < m.rowSize(); i++)
             for (int j = 0; j < m.columnSize(); j++)


[11/16] ignite git commit: IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313

Posted by sb...@apache.org.
IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de0571c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de0571c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de0571c

Branch: refs/heads/ignite-5578
Commit: 6de0571c21ffdb77af7bb1d18e9659126d7f321b
Parents: 199b954
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Jul 21 16:35:43 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:35:43 2017 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           | 93 +++++++++++++-------
 1 file changed, 61 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6de0571c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 897f903..b655ddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -319,7 +319,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         try {
             if (mode == WALMode.BACKGROUND) {
                 if (currHnd != null)
-                    currHnd.flush((FileWALPointer)null);
+                    currHnd.flush((FileWALPointer)null, true);
             }
 
             if (currHnd != null)
@@ -526,7 +526,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             return;
 
         if (mode == WALMode.LOG_ONLY || forceFlush) {
-            cur.flushOrWait(filePtr);
+            cur.flushOrWait(filePtr, false);
 
             return;
         }
@@ -535,7 +535,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (filePtr != null && !cur.needFsync(filePtr))
             return;
 
-        cur.fsync(filePtr);
+        cur.fsync(filePtr, false);
     }
 
     /** {@inheritDoc} */
@@ -1700,12 +1700,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             this.maxSegmentSize = maxSegmentSize;
             this.serializer = serializer;
 
-            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0)));
+            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false));
             written = pos;
             lastFsyncPos = pos;
         }
 
         /**
+         * Checks if current head is a close fake record and returns {@code true} if so.
+         *
+         * @return {@code true} if current head is close record.
+         */
+        private boolean stopped() {
+            return stopped(head.get());
+        }
+
+        /**
+         * @param record Record to check.
+         * @return {@code true} if the record is fake close record.
+         */
+        private boolean stopped(WALRecord record) {
+            return record instanceof FakeRecord && ((FakeRecord)record).stop;
+        }
+
+        /**
          * @param rec Record to be added to record chain as new {@link #head}
          * @return Pointer or null if roll over to next segment is required or already started by other thread.
          * @throws StorageException If failed.
@@ -1721,9 +1738,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 long nextPos = nextPosition(h);
 
-                // It is important that we read `stop` after `head` in this loop for correct close,
-                // because otherwise we will have a race on the last flush in close.
-                if (nextPos + rec.size() >= maxSegmentSize || stop.get()) {
+                if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
                     // Can not write to this segment, need to switch to the next one.
                     return null;
                 }
@@ -1731,7 +1746,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 int newChainSize = h.chainSize() + rec.size();
 
                 if (newChainSize > tlbSize && !flushed) {
-                    boolean res = h.previous() == null || flush(h);
+                    boolean res = h.previous() == null || flush(h, false);
 
                     if (rec.size() > tlbSize)
                         flushed = res;
@@ -1770,7 +1785,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param ptr Pointer.
          * @throws IgniteCheckedException If failed.
          */
-        private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+        private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException {
             long expWritten;
 
             if (ptr != null) {
@@ -1783,7 +1798,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             else // We read head position before the flush because otherwise we can get wrong position.
                 expWritten = recordOffset(head.get());
 
-            if (flush(ptr))
+            if (flush(ptr, stop))
                 return;
 
             // Spin-wait for a while before acquiring the lock.
@@ -1810,18 +1825,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException {
+        private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException {
             if (ptr == null) { // Unconditional flush.
                 for (; ; ) {
                     WALRecord expHead = head.get();
 
                     if (expHead.previous() == null) {
-                        assert expHead instanceof FakeRecord;
+                        FakeRecord frHead = (FakeRecord)expHead;
 
-                        return false;
+                        if (frHead.stop == stop || frHead.stop ||
+                            head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop)))
+                            return false;
                     }
 
-                    if (flush(expHead))
+                    if (flush(expHead, stop))
                         return true;
                 }
             }
@@ -1835,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 if (chainBeginPosition(h) > ptr.fileOffset())
                     return false;
 
-                if (flush(h))
+                if (flush(h, stop))
                     return true; // We are lucky.
             }
         }
@@ -1853,17 +1870,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException {
+        private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException {
             if (expHead.previous() == null) {
-                assert expHead instanceof FakeRecord;
+                FakeRecord frHead = (FakeRecord)expHead;
 
-                return false;
+                if (stop == frHead.stop)
+                    return false;
             }
 
             // Fail-fast before CAS.
             checkEnvironment();
 
-            if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0))))
+            if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
                 return false;
 
             // At this point we grabbed the piece of WAL chain.
@@ -1976,7 +1994,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param ptr Pointer to sync.
          * @throws StorageException If failed.
          */
-        private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
+        private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException {
             lock.lock();
 
             try {
@@ -1984,7 +2002,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     if (!needFsync(ptr))
                         return;
 
-                    if (fsyncDelay > 0 && !stop.get()) {
+                    if (fsyncDelay > 0 && !stopped()) {
                         // Delay fsync to collect as many updates as possible: trade latency for throughput.
                         U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
 
@@ -1993,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     }
                 }
 
-                flushOrWait(ptr);
+                flushOrWait(ptr, stop);
 
                 if (lastFsyncPos != written) {
                     assert lastFsyncPos < written; // Fsync position must be behind.
@@ -2031,13 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws StorageException If failed.
          */
         private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
-            if (stop.compareAndSet(false, true)) {
-                // Here we can be sure that no other records will be added and this fsync will be the last.
-                if (mode == WALMode.DEFAULT)
-                    fsync(null);
-                else
-                    flushOrWait(null);
+            if (mode == WALMode.DEFAULT)
+                fsync(null, true);
+            else
+                flushOrWait(null, true);
+
+            assert stopped() : "Segment is not closed after close flush: " + head.get();
 
+            if (stop.compareAndSet(false, true)) {
                 try {
                     int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
 
@@ -2068,8 +2087,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 return true;
             }
-
-            return false;
+            else
+                return false;
         }
 
         /**
@@ -2271,17 +2290,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * Fake record is allowed to have no previous record.
      */
     private static final class FakeRecord extends WALRecord {
+        /** */
+        private final boolean stop;
+
         /**
          * @param pos Position.
          */
-        FakeRecord(FileWALPointer pos) {
+        FakeRecord(FileWALPointer pos, boolean stop) {
             position(pos);
+
+            this.stop = stop;
         }
 
         /** {@inheritDoc} */
         @Override public RecordType type() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public FileWALPointer position() {
+            return (FileWALPointer) super.position();
+        }
     }
 
     /**
@@ -2492,7 +2521,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private void doFlush() {
         final FileWriteHandle hnd = currentHandle();
         try {
-            hnd.flush(hnd.head.get());
+            hnd.flush(hnd.head.get(), false);
         }
         catch (Exception e) {
             U.warn(log, "Failed to flush WAL record queue", e);


[14/16] ignite git commit: Test for cache partitions state, fix for client cache start.

Posted by sb...@apache.org.
Test for cache partitions state, fix for client cache start.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeb9336b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeb9336b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeb9336b

Branch: refs/heads/ignite-5578
Commit: aeb9336b3b161ddfff73f17e41cd453409b84a16
Parents: ca496f6
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:47:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:47:16 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  51 ++-
 .../dht/GridClientPartitionTopology.java        |   7 +-
 .../dht/GridDhtPartitionTopology.java           |  12 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  45 +-
 .../GridDhtPartitionsExchangeFuture.java        | 120 +++---
 .../GridCacheDatabaseSharedManager.java         |   6 +-
 .../CacheLateAffinityAssignmentTest.java        |  36 +-
 .../distributed/CachePartitionStateTest.java    | 410 +++++++++++++++++++
 .../TestCacheNodeExcludingFilter.java           |  53 +++
 .../db/IgnitePdsCacheRestoreTest.java           | 208 ++++++++++
 .../testsuites/IgniteCacheTestSuite6.java       |  38 ++
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   3 +
 12 files changed, 863 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 79ab183..f519b4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -517,6 +517,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
+        for (DynamicCacheDescriptor desc : startDescs) {
+            if (desc.cacheConfiguration().getCacheMode() != LOCAL) {
+                CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                assert grp != null;
+
+                grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+            }
+        }
+
         cctx.cache().initCacheProxies(topVer, null);
 
         cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
@@ -1299,6 +1309,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param grpId Group ID.
+     * @return Group name for debug purpose.
+     */
+    private String debugGroupName(int grpId) {
+        CacheGroupDescriptor desc = caches.group(grpId);
+
+        if (desc != null)
+            return desc.cacheOrGroupName();
+        else
+            return "Unknown group: " + grpId;
+    }
+
+    /**
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
@@ -1396,19 +1419,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * Called on exchange initiated by server node leave.
      *
      * @param fut Exchange future.
+     * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if affinity should be assigned by coordinator.
      */
-    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
         ClusterNode leftNode = fut.discoveryEvent().eventNode();
 
         assert !leftNode.isClient() : leftNode;
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal())
-                continue;
+        if (crd) {
+            // Need initialize CacheGroupHolders if this node become coordinator on this exchange.
+            forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                    CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
 
-            grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                    cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                }
+            });
+        }
+        else {
+            forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+                @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                    aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                }
+            });
         }
 
         synchronized (mux) {
@@ -1433,12 +1468,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                 CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
 
-                if (grpHolder != null) {
-                    if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}.
-                        grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
+                if (grpHolder != null)
                     return;
-                }
 
                 // Need initialize holders and affinity if this node became coordinator during this exchange.
                 final Integer grpId = desc.groupId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f4ed517..232ce38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -367,6 +367,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtLocalPartition localPartition(int p) {
         return localPartition(p, AffinityTopologyVersion.NONE, false);
     }
@@ -830,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(AffinityAssignment assignment) {
+    @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 5f76d12..d9e04a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -130,6 +130,15 @@ public interface GridDhtPartitionTopology {
         throws GridDhtInvalidPartitionException;
 
     /**
+     * Unconditionally creates partition during restore of persisted partition state.
+     *
+     * @param p Partition ID.
+     * @return Partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException;
+
+    /**
      * @param topVer Topology version at the time of creation.
      * @param p Partition ID.
      * @param create If {@code true}, then partition will be created if it's not there.
@@ -331,6 +340,7 @@ public interface GridDhtPartitionTopology {
      * Callback on exchange done.
      *
      * @param assignment New affinity assignment.
+     * @param updateRebalanceVer {@code True} if need check rebalance state.
      */
-    public void onExchangeDone(AffinityAssignment assignment);
+    public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 601da1b..5ef499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -135,9 +135,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-    /** */
-    private volatile boolean treatAllPartAsLoc;
-
     /**
      * @param ctx Cache shared context.
      * @param grp Cache group.
@@ -421,14 +418,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
         throws IgniteCheckedException {
-        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-        treatAllPartAsLoc = exchFut.activateCluster()
-            || (discoEvt.type() == EventType.EVT_NODE_JOINED
-            && discoEvt.eventNode().isLocal()
-            && !ctx.kernalContext().clientNode()
-        );
-
         ClusterNode loc = ctx.localNode();
 
         ctx.database().checkpointReadLock();
@@ -540,8 +529,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
-        treatAllPartAsLoc = false;
-
         boolean changed = false;
 
         int num = grp.affinity().partitions();
@@ -692,6 +679,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         return loc;
     }
 
+    /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+        lock.writeLock().lock();
+
+        try {
+            GridDhtLocalPartition part = locParts.get(p);
+
+            if (part != null)
+                return part;
+
+            part = new GridDhtLocalPartition(ctx, grp, p);
+
+            locParts.set(p, part);
+
+            ctx.pageStore().onPartitionCreated(grp.groupId(), p);
+
+            return part;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     /**
      * @param p Partition number.
      * @param topVer Topology version.
@@ -731,7 +741,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
 
-                if (!treatAllPartAsLoc && !belongs)
+                if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
                         "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -741,7 +751,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
 
             if (loc == null) {
-                if (!treatAllPartAsLoc && !belongs)
+                if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
@@ -1499,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(AffinityAssignment assignment) {
+    @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
         lock.writeLock().lock();
 
         try {
             if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
                 rebuildDiff(assignment);
+
+            if (updateRebalanceVer)
+                updateRebalanceVersion(assignment.assignment());
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c4a4f83..cdb4bb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -193,9 +193,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** */
-    private boolean clientOnlyExchange;
-
     /** Init timestamp. Used to track the amount of time spent to complete the future. */
     private long initTs;
 
@@ -485,26 +482,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                     }
-                    else {
-                        cctx.activate();
-
-                        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
-                            cctx.cache().cachesToStartOnLocalJoin();
-
-                        if (cctx.database().persistenceEnabled() &&
-                            !cctx.kernalContext().clientNode()) {
-                            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
-
-                            if (caches != null) {
-                                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
-                                    startDescs.add(c.get1());
-                            }
-
-                            cctx.database().readCheckpointAndRestoreMemory(startDescs);
-                        }
-
-                        cctx.cache().startCachesOnLocalJoin(caches, topVer);
-                    }
+                    else
+                        initCachesOnLocalJoin();
                 }
 
                 exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -571,6 +550,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @throws IgniteCheckedException If failed.
      */
+    private void initCachesOnLocalJoin() throws IgniteCheckedException {
+        cctx.activate();
+
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+            cctx.cache().cachesToStartOnLocalJoin();
+
+        if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+            if (caches != null) {
+                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+                    startDescs.add(c.get1());
+            }
+
+            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+        }
+
+        cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     private void initTopologies() throws IgniteCheckedException {
         cctx.database().checkpointReadLock();
 
@@ -776,7 +778,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             warnNoAffinityNodes();
 
-            centralizedAff = cctx.affinity().onServerLeft(this);
+            centralizedAff = cctx.affinity().onServerLeft(this, crd);
         }
         else
             cctx.affinity().onServerJoin(this, crd);
@@ -788,40 +790,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void clientOnlyExchange() throws IgniteCheckedException {
-        clientOnlyExchange = true;
-
         if (crd != null) {
-            if (crd.isLocal()) {
-                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    boolean updateTop = !grp.isLocal() &&
-                        exchId.topologyVersion().equals(grp.localStartVersion());
-
-                    if (updateTop) {
-                        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                            if (top.groupId() == grp.groupId()) {
-                                GridDhtPartitionFullMap fullMap = top.partitionMap(true);
-
-                                assert fullMap != null;
-
-                                grp.topology().update(topologyVersion(),
-                                    fullMap,
-                                    top.updateCounters(false),
-                                    Collections.<Integer>emptySet());
+            assert !crd.isLocal() : crd;
 
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            else {
-                if (!centralizedAff)
-                    sendLocalPartitions(crd);
+            if (!centralizedAff)
+                sendLocalPartitions(crd);
 
-                initDone();
+            initDone();
 
-                return;
-            }
+            return;
         }
         else {
             if (centralizedAff) { // Last server node failed.
@@ -896,8 +873,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = cctx.snapshot()
-                .tryStartLocalSnapshotOperation(discoEvt);
+            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
 
             if (fut != null) {
                 fut.get();
@@ -1122,6 +1098,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
         assert node != null;
 
+        GridDhtPartitionsSingleMessage msg;
+
         // Reset lost partition before send local partition to coordinator.
         if (exchActions != null) {
             Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1130,22 +1108,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 resetLostPartitions(caches);
         }
 
-        GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
-            node, exchangeId(), clientOnlyExchange, true);
+        if (cctx.kernalContext().clientNode()) {
+            msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+                true,
+                null,
+                true);
+        }
+        else {
+            msg = cctx.exchange().createPartitionsSingleMessage(node,
+                exchangeId(),
+                false,
+                true);
+        }
 
         Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
         if (partHistReserved0 != null)
-            m.partitionHistoryCounters(partHistReserved0);
+            msg.partitionHistoryCounters(partHistReserved0);
 
         if (stateChangeExchange() && changeGlobalStateE != null)
-            m.setError(changeGlobalStateE);
+            msg.setError(changeGlobalStateE);
 
         if (log.isDebugEnabled())
-            log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
+            log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
 
         try {
-            cctx.io().send(node, m, SYSTEM_POOL);
+            cctx.io().send(node, msg, SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())
@@ -1318,7 +1306,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (err == null) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal())
-                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
+                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
             }
         }
 
@@ -1386,10 +1374,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         assert msg != null;
         assert msg.exchangeId().equals(exchId) : msg;
-        assert msg.lastVersion() != null : msg;
 
-        if (!msg.client())
+        if (!msg.client()) {
+            assert msg.lastVersion() != null : msg;
+
             updateLastVersion(msg.lastVersion());
+        }
 
         if (isDone()) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 39038ba..1797d64 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1560,8 +1560,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                             T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
 
-                            GridDhtLocalPartition part = grp.topology()
-                                .localPartition(i, AffinityTopologyVersion.NONE, true);
+                            GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
 
                             assert part != null;
 
@@ -1621,8 +1620,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param dataEntry Data entry to apply.
      */
     private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
-        GridDhtLocalPartition locPart = cacheCtx.topology()
-            .localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true);
+        GridDhtLocalPartition locPart = cacheCtx.topology().forceCreatePartition(dataEntry.partitionId());
 
         switch (dataEntry.op()) {
             case CREATE:

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 23043d1..7d8620a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -331,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         testAffinitySimpleSequentialStart();
 
@@ -351,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1)));
 
         startServer(0, 1);
 
@@ -391,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
 
         startServer(0, 1);
         startServer(1, 2);
@@ -439,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         Ignite ignite0 = startServer(0, 1);
 
@@ -467,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         Ignite ignite0 = startServer(0, 1);
 
@@ -520,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
      */
     private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception {
         if (!cacheOnCrd)
-            cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+            cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
 
         startServer(0, 1);
 
@@ -2069,7 +2069,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
                     exclude.add("server-" + (srvIdx + rnd.nextInt(10)));
             }
 
-            ccfg.setNodeFilter(new CacheNodeFilter(exclude));
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
         }
 
         ccfg.setName(name);
@@ -2645,28 +2645,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     /**
      *
      */
-    static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
-        /** */
-        private Collection<String> excludeNodes;
-
-        /**
-         * @param excludeNodes Nodes names.
-         */
-        public CacheNodeFilter(Collection<String> excludeNodes) {
-            this.excludeNodes = excludeNodes;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(ClusterNode clusterNode) {
-            String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
-
-            return !excludeNodes.contains(name);
-        }
-    }
-
-    /**
-     *
-     */
     static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
         private boolean blockCustomEvt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
new file mode 100644
index 0000000..c64ed0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ *
+ */
+public class CachePartitionStateTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private CacheConfiguration ccfg;
+
+    /** {@inheritDoc} */
+    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setClientMode(client);
+
+        if (ccfg != null) {
+            cfg.setCacheConfiguration(ccfg);
+
+            ccfg = null;
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_1() throws Exception {
+        partitionState1(0, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_2() throws Exception {
+        partitionState1(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception {
+        partitionState1(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_3() throws Exception {
+        partitionState1(100, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_1() throws Exception {
+        partitionState2(0, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_2() throws Exception {
+        partitionState2(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception {
+        partitionState2(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_3() throws Exception {
+        partitionState2(100, true);
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param crdAffNode If {@code false} cache is not created on coordinator.
+     * @throws Exception If failed.
+     */
+    private void partitionState1(int backups, boolean crdAffNode) throws Exception {
+        startGrids(3);
+
+        blockSupplySend(DEFAULT_CACHE_NAME);
+
+        CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+        if (!crdAffNode)
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+        ignite(1).createCache(ccfg);
+
+        AffinityAssignment assign0 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(3, 1));
+
+        awaitPartitionMapExchange();
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        client = true;
+
+        Ignite clientNode = startGrid(4);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        clientNode.cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        client = false;
+
+        startGrid(5);
+
+        checkRebalance(DEFAULT_CACHE_NAME, false);
+
+        for (int i = 0; i < 3; i++)
+            checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING);
+
+        AffinityAssignment assign1 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 0));
+
+        checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING);
+
+        stopBlock();
+
+        awaitPartitionMapExchange();
+
+        AffinityAssignment assign2 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 1));
+
+        checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        if (!crdAffNode)
+            ignite(0).cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        startGrid(6);
+
+        awaitPartitionMapExchange();
+
+        AffinityAssignment assign3 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(6, 1));
+
+        checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param crdAffNode If {@code false} cache is not created on coordinator.
+     * @throws Exception If failed.
+     */
+    private void partitionState2(int backups, boolean crdAffNode) throws Exception {
+        startGrids(3);
+
+        blockSupplySend(DEFAULT_CACHE_NAME);
+
+        ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+        if (!crdAffNode)
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+        startGrid(4);
+
+        AffinityAssignment assign0 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(4, 0));
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        if (!crdAffNode)
+            ignite(0).cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        stopBlock();
+
+        startGrid(5);
+
+        AffinityAssignment assign1 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 1));
+
+        awaitPartitionMapExchange();
+
+        checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+    }
+
+    /**
+     * @param assign Assignments.
+     * @param cacheName Cache name.
+     * @param expState Expected state.
+     */
+    private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) {
+        for (Ignite node : G.allGrids())
+            checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState);
+    }
+
+    /**
+     * @param assign Assignments.
+     * @param clusterNode Node.
+     * @param cacheName Cache name.
+     * @param expState Expected partitions state.
+     */
+    private void checkNodePartitions(AffinityAssignment assign,
+        ClusterNode clusterNode,
+        String cacheName,
+        GridDhtPartitionState expState)
+    {
+        Affinity<Object> aff = ignite(0).affinity(cacheName);
+
+        Set<Integer> nodeParts = new HashSet<>();
+
+        nodeParts.addAll(assign.primaryPartitions(clusterNode.id()));
+        nodeParts.addAll(assign.backupPartitions(clusterNode.id()));
+
+        log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']');
+
+        if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName))
+            assertFalse(nodeParts.isEmpty());
+
+        boolean check = false;
+
+        for (Ignite node : G.allGrids()) {
+            GridCacheAdapter cache =
+                ((IgniteKernal)node).context().cache().internalCache(cacheName);
+
+            if (cache != null) {
+                check = true;
+
+                GridDhtPartitionTopology top = cache.context().topology();
+
+                GridDhtPartitionMap partsMap = top.partitions(clusterNode.id());
+
+                for (int p = 0; p < aff.partitions(); p++) {
+                    if (nodeParts.contains(p)) {
+                        assertNotNull(partsMap);
+                        assertEquals(expState, partsMap.get(p));
+                    }
+                    else {
+                        if (partsMap != null) {
+                            GridDhtPartitionState state = partsMap.get(p);
+
+                            assertTrue("Unexpected state: " + state, state == null || state == EVICTED);
+                        }
+                    }
+                }
+            }
+            else {
+                assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length);
+                assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length);
+            }
+        }
+
+        assertTrue(check);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param expDone Expected rebalance finish flag.
+     */
+    private void checkRebalance(String cacheName, boolean expDone) {
+        for (Ignite node : G.allGrids()) {
+            IgniteKernal node0 = (IgniteKernal)node;
+
+            GridCacheAdapter cache = node0.context().cache().internalCache(cacheName);
+
+            AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion();
+
+            if (cache != null)
+                assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer));
+            else
+                node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    private void blockSupplySend(String cacheName) {
+        for (Ignite node : G.allGrids())
+            blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName);
+    }
+
+    /**
+     * @param spi SPI.
+     * @param cacheName Cache name.
+     */
+    private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
+        final int grpId = CU.cacheId(cacheName);
+
+        spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) &&
+                    ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    private void stopBlock() {
+        for (Ignite node : G.allGrids())
+            TestRecordingCommunicationSpi.spi(node).stopBlock();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param backups Backups number.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration(name);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
new file mode 100644
index 0000000..a3f7d27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ *
+ */
+public class TestCacheNodeExcludingFilter implements IgnitePredicate<ClusterNode> {
+    /** */
+    private Collection<String> excludeNodes;
+
+    /**
+     * @param excludeNodes Nodes names.
+     */
+    public TestCacheNodeExcludingFilter(Collection<String> excludeNodes) {
+        this.excludeNodes = excludeNodes;
+    }
+    /**
+     * @param excludeNodes Nodes names.
+     */
+    public TestCacheNodeExcludingFilter(String... excludeNodes) {
+        this.excludeNodes = Arrays.asList(excludeNodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode clusterNode) {
+        String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
+
+        return !excludeNodes.contains(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
new file mode 100644
index 0000000..25626f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private CacheConfiguration[] ccfgs;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        if (ccfgs != null) {
+            cfg.setCacheConfiguration(ccfgs);
+
+            ccfgs = null;
+        }
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setPageSize(1024);
+        memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+
+        pCfg.setWalMode(WALMode.LOG_ONLY);
+
+        cfg.setPersistentStoreConfiguration(pCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        GridTestUtils.deleteDbFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        GridTestUtils.deleteDbFiles();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestoreAndNewCache1() throws Exception {
+        restoreAndNewCache(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestoreAndNewCache2() throws Exception {
+        restoreAndNewCache(true);
+    }
+
+    /**
+     * @param createNew If {@code true} need cache is added while node is stopped.
+     * @throws Exception If failed.
+     */
+    private void restoreAndNewCache(boolean createNew) throws Exception {
+        for (int i = 0; i < 3; i++) {
+            ccfgs = configurations1();
+
+            startGrid(i);
+        }
+
+        ignite(0).active(true);
+
+        IgniteCache<Object, Object> cache1 = ignite(2).cache("c1");
+
+        List<Integer> keys = primaryKeys(cache1, 10);
+
+        for (Integer key : keys)
+            cache1.put(key, key);
+
+        stopGrid(2);
+
+        if (createNew) {
+            // New cache is added when node is stopped.
+            ignite(0).getOrCreateCaches(Arrays.asList(configurations2()));
+        }
+        else {
+            // New cache is added on node restart.
+            ccfgs = configurations2();
+        }
+
+        startGrid(2);
+
+        cache1 = ignite(2).cache("c1");
+
+        IgniteCache<Object, Object> cache2 = ignite(2).cache("c2");
+
+        for (Integer key : keys) {
+            assertEquals(key, cache1.get(key));
+
+            assertNull(cache2.get(key));
+
+            cache2.put(key, key);
+
+            assertEquals(key, cache2.get(key));
+        }
+
+        List<Integer> nearKeys = nearKeys(cache1, 10, 0);
+
+        for (Integer key : nearKeys) {
+            assertNull(cache1.get(key));
+            assertNull(cache2.get(key));
+
+            cache2.put(key, key);
+            assertEquals(key, cache2.get(key));
+
+            cache1.put(key, key);
+            assertEquals(key, cache1.get(key));
+        }
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        for (Integer key : nearKeys) {
+            assertEquals(key, cache2.get(key));
+
+            assertEquals(key, cache1.get(key));
+        }
+    }
+
+    /**
+     * @return Configurations set 1.
+     */
+    private CacheConfiguration[] configurations1() {
+        CacheConfiguration[] ccfgs = new CacheConfiguration[1];
+
+        ccfgs[0] = cacheConfiguration("c1");
+
+        return ccfgs;
+    }
+
+    /**
+     * @return Configurations set 1.
+     */
+    private CacheConfiguration[] configurations2() {
+        CacheConfiguration[] ccfgs = new CacheConfiguration[2];
+
+        ccfgs[0] = cacheConfiguration("c1");
+        ccfgs[1] = cacheConfiguration("c2");
+
+        return ccfgs;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration(name);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
new file mode 100644
index 0000000..bb32d24
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+
+/**
+ * Test suite.
+ */
+public class IgniteCacheTestSuite6 extends TestSuite {
+    /**
+     * @return IgniteCache test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("IgniteCache Test Suite part 6");
+
+        suite.addTestSuite(CachePartitionStateTest.class);
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 5b562c3..5762c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactiva
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsEvictionTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest;
@@ -74,6 +75,8 @@ public class IgnitePdsTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
 
+        suite.addTestSuite(IgnitePdsCacheRestoreTest.class);
+
         return suite;
     }
 }


[03/16] ignite git commit: IGNITE-5788 Web Console: Fixed dependencies for maven project with c3p0.

Posted by sb...@apache.org.
IGNITE-5788 Web Console: Fixed dependencies for maven project with c3p0.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02a1bdca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02a1bdca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02a1bdca

Branch: refs/heads/ignite-5578
Commit: 02a1bdca57ce6af7fe7636b0a9f99048c89b88b6
Parents: 70d0f99
Author: Andrey Novikov <an...@gridgain.com>
Authored: Thu Jul 20 15:47:49 2017 +0700
Committer: Andrey Novikov <an...@gridgain.com>
Committed: Thu Jul 20 15:47:49 2017 +0700

----------------------------------------------------------------------
 modules/web-console/frontend/app/data/pom-dependencies.json | 5 ++++-
 .../app/modules/configuration/generator/Maven.service.js    | 9 +++------
 modules/web-console/frontend/webpack/webpack.common.js      | 4 ++--
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/app/data/pom-dependencies.json
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/data/pom-dependencies.json b/modules/web-console/frontend/app/data/pom-dependencies.json
index 945e3f5..8d3fa81 100644
--- a/modules/web-console/frontend/app/data/pom-dependencies.json
+++ b/modules/web-console/frontend/app/data/pom-dependencies.json
@@ -11,7 +11,10 @@
     "HadoopIgfsJcl": {"artifactId": "ignite-hadoop"},
     "SLF4J": {"artifactId": "ignite-slf4j"},
 
-    "Generic": {"groupId": "com.mchange", "artifactId": "c3p0", "version": "0.9.5.2"},
+    "Generic": [
+        {"groupId": "com.mchange", "artifactId": "c3p0", "version": "0.9.5.2"},
+        {"groupId": "com.mchange", "artifactId": "mchange-commons-java", "version": "0.2.11"}
+    ],
     "MySQL": {"groupId": "mysql", "artifactId": "mysql-connector-java", "version": "5.1.40"},
     "PostgreSQL": {"groupId": "org.postgresql", "artifactId": "postgresql", "version": "9.4.1212.jre7"},
     "H2": {"groupId": "com.h2database", "artifactId": "h2", "version": [

http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js b/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
index 81d7d10..abbada9 100644
--- a/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
+++ b/modules/web-console/frontend/app/modules/configuration/generator/Maven.service.js
@@ -47,12 +47,9 @@ export default class IgniteMavenGenerator {
             return _.isArray(version) ? _.find(version, (v) => versionService.since(igniteVer, v.range)).version : version;
         };
 
-        if (!_.has(POM_DEPENDENCIES, key))
-            return;
-
-        const {groupId, artifactId, version, jar} = POM_DEPENDENCIES[key];
-
-        this.addDependency(deps, groupId || 'org.apache.ignite', artifactId, extractVersion(version) || dfltVer, jar);
+        _.forEach(POM_DEPENDENCIES[key], ({groupId, artifactId, version, jar}) => {
+            this.addDependency(deps, groupId || 'org.apache.ignite', artifactId, extractVersion(version) || dfltVer, jar);
+        });
     }
 
     addResource(sb, dir, exclude) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/02a1bdca/modules/web-console/frontend/webpack/webpack.common.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/webpack/webpack.common.js b/modules/web-console/frontend/webpack/webpack.common.js
index a303d6e..5a3763e 100644
--- a/modules/web-console/frontend/webpack/webpack.common.js
+++ b/modules/web-console/frontend/webpack/webpack.common.js
@@ -138,7 +138,7 @@ export default {
             },
             {
                 test: /\.(jpe?g|png|gif)$/i,
-                loader: 'file?name=assets/images/[name]_[hash].[ext]'
+                loader: 'file?name=assets/images/[name].[hash].[ext]'
             },
             {
                 test: require.resolve('jquery'),
@@ -178,7 +178,7 @@ export default {
         new HtmlWebpackPlugin({
             template: './views/index.pug'
         }),
-        new ExtractTextPlugin({filename: 'assets/css/[name].css', allChunks: true}),
+        new ExtractTextPlugin({filename: 'assets/css/[name].[hash].css', allChunks: true}),
         new CopyWebpackPlugin([
             { context: 'public', from: '**/*.png' },
             { context: 'public', from: '**/*.svg' },


[13/16] ignite git commit: Merge branch 'ignite-2.1.3'

Posted by sb...@apache.org.
Merge branch 'ignite-2.1.3'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca496f6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca496f6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca496f6e

Branch: refs/heads/ignite-5578
Commit: ca496f6e9e48a107abb72a4fea87b5b11da66806
Parents: 0adaf6e c1a3b37
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 21 17:05:10 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 17:05:10 2017 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionMap.java      |  7 +-
 .../wal/FileWriteAheadLogManager.java           | 93 +++++++++++++-------
 .../dotnet/PlatformDotNetCacheStore.java        | 31 +++++++
 .../Apache.Ignite.Core.Tests.csproj             |  5 ++
 .../Cache/Store/CacheStoreSessionTest.cs        | 90 +++++++++++++------
 .../Store/CacheStoreSessionTestCodeConfig.cs    | 68 ++++++++++++++
 .../Store/CacheStoreSessionTestSharedFactory.cs | 48 ++++++++++
 .../Cache/Store/CacheStoreTest.cs               | 10 ++-
 .../cache-store-session-shared-factory.xml      | 76 ++++++++++++++++
 .../Config/Cache/Store/cache-store-session.xml  | 20 ++---
 .../Impl/Cache/Store/CacheStoreInternal.cs      | 14 ++-
 modules/web-console/frontend/app/app.js         | 14 +++
 .../frontend/app/data/pom-dependencies.json     |  5 +-
 .../configuration/generator/Maven.service.js    |  9 +-
 .../frontend/app/modules/states/errors.state.js |  6 +-
 .../frontend/app/modules/states/signin.state.js | 10 ++-
 .../frontend/webpack/webpack.common.js          |  4 +-
 17 files changed, 422 insertions(+), 88 deletions(-)
----------------------------------------------------------------------



[05/16] ignite git commit: IGNITE-5776: Add option to turn on filter reachable addresses in TcpCommunicationSpi. This closes #2317.

Posted by sb...@apache.org.
IGNITE-5776: Add option to turn on filter reachable addresses in TcpCommunicationSpi. This closes #2317.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd7a08e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd7a08e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd7a08e3

Branch: refs/heads/ignite-5578
Commit: bd7a08e31d03b2c51b225cf388dc1197348a1593
Parents: e285f9d
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Thu Jul 20 13:32:18 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Thu Jul 20 13:32:18 2017 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 77 ++++++++++++++------
 1 file changed, 56 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bd7a08e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 35d3032..5b952e8 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -311,6 +311,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
     public static final boolean DFLT_TCP_NODELAY = true;
 
+    /** Default value for {@code FILTER_REACHABLE_ADDRESSES} socket option (value is <tt>false</tt>). */
+    public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false;
+
     /** Default received messages threshold for sending ack. */
     public static final int DFLT_ACK_SND_THRESHOLD = 32;
 
@@ -1016,6 +1019,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
+    /** {@code FILTER_REACHABLE_ADDRESSES} option value for created sockets. */
+    private boolean filterReachableAddresses = DFLT_FILTER_REACHABLE_ADDRESSES;
+
     /** Number of received messages after which acknowledgment is sent. */
     private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
 
@@ -1626,6 +1632,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * Gets value for {@code FILTER_REACHABLE_ADDRESSES} socket option.
+     *
+     * @return {@code True} if needed to filter reachable addresses.
+     */
+    public boolean isFilterReachableAddresses() {
+        return filterReachableAddresses;
+    }
+
+    /**
+     * Setting this option to {@code true} enables filter for reachable
+     * addresses on creating tcp client.
+     * <p>
+     * Usually its advised to set this value to {@code false}.
+     * <p>
+     * If not provided, default value is {@link #DFLT_FILTER_REACHABLE_ADDRESSES}.
+     *
+     * @param filterReachableAddresses {@code True} to filter reachable addresses.
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) {
+        this.filterReachableAddresses = filterReachableAddresses;
+
+        return this;
+    }
+
+    /**
      * Sets receive buffer size for sockets created or accepted by this SPI.
      * <p>
      * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
@@ -2952,35 +2985,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         if (isExtAddrsExist)
             addrs.addAll(extAddrs);
 
-        Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+        if (filterReachableAddresses) {
+            Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
 
-        for (InetSocketAddress addr : addrs) {
-            // Skip unresolved as addr.getAddress() can return null.
-            if(!addr.isUnresolved())
-                allInetAddrs.add(addr.getAddress());
-        }
+            for (InetSocketAddress addr : addrs) {
+                // Skip unresolved as addr.getAddress() can return null.
+                if (!addr.isUnresolved())
+                    allInetAddrs.add(addr.getAddress());
+            }
 
-        List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+            List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
 
-        if (reachableInetAddrs.size() < allInetAddrs.size()) {
-            LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+            if (reachableInetAddrs.size() < allInetAddrs.size()) {
+                LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
 
-            List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
+                List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
 
-            for (InetSocketAddress addr : addrs) {
-                if (reachableInetAddrs.contains(addr.getAddress()))
-                    addrs0.add(addr);
-                else
-                    unreachableInetAddr.add(addr);
-            }
+                for (InetSocketAddress addr : addrs) {
+                    if (reachableInetAddrs.contains(addr.getAddress()))
+                        addrs0.add(addr);
+                    else
+                        unreachableInetAddr.add(addr);
+                }
 
-            addrs0.addAll(unreachableInetAddr);
+                addrs0.addAll(unreachableInetAddr);
 
-            addrs = addrs0;
-        }
+                addrs = addrs0;
+            }
 
-        if (log.isDebugEnabled())
-            log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+            if (log.isDebugEnabled())
+                log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+        }
 
         boolean conn = false;
         GridCommunicationClient client = null;


[10/16] ignite git commit: IGNITE-5752 Fixed updateSequence updating in GridDhtPartitionMap. - Fixes #2297.

Posted by sb...@apache.org.
IGNITE-5752 Fixed updateSequence updating in GridDhtPartitionMap. - Fixes #2297.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/199b9543
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/199b9543
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/199b9543

Branch: refs/heads/ignite-5578
Commit: 199b954345f179851718acd131188506668cd4f3
Parents: 02a1bdc
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Jul 21 16:29:15 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Jul 21 16:29:15 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/preloader/GridDhtPartitionMap.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/199b9543/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index cfd4400..735ca1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -24,7 +24,6 @@ import java.io.ObjectOutput;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
@@ -202,9 +201,13 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
      * @return Old update sequence value.
      */
     public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
+        assert topVer.compareTo(top) >= 0 : "Invalid topology version [cur=" + top + ", new=" + topVer + "]";
+
         long old = this.updateSeq;
 
-        assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
+        // Overwrite update sequence without checking in case of greater topology version
+        if (topVer.compareTo(top) == 0)
+            assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
 
         this.updateSeq = updateSeq;
 


[15/16] ignite git commit: 5578

Posted by sb...@apache.org.
5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/530ca724
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/530ca724
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/530ca724

Branch: refs/heads/ignite-5578
Commit: 530ca72492bd1e0d2c92035b17fb6e8ce674e974
Parents: 5a663e6 aeb9336
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 24 11:56:58 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 24 11:56:58 2017 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |  18 +-
 .../dht/preloader/GridDhtPartitionMap.java      |   7 +-
 .../GridDhtPartitionsExchangeFuture.java        |  17 +-
 .../wal/FileWriteAheadLogManager.java           |  93 ++--
 .../processors/job/GridJobProcessor.java        |  10 +-
 .../dotnet/PlatformDotNetCacheStore.java        |  31 ++
 .../visor/cache/VisorCacheJdbcType.java         |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  77 +++-
 .../internal/IgniteComputeJobOneThreadTest.java |  75 ++++
 .../dht/IgniteCacheMultiTxLockSelfTest.java     |   2 -
 .../testsuites/IgniteCacheTestSuite6.java       |  38 ++
 .../testsuites/IgniteComputeGridTestSuite.java  |   3 +
 .../apache/ignite/ml/math/DistanceMeasure.java  |   2 +-
 .../ignite/ml/math/EuclideanDistance.java       |   3 +-
 .../math/decompositions/EigenDecomposition.java |   2 +-
 .../apache/ignite/ml/math/impls/CacheUtils.java | 198 +++++++--
 .../ml/math/impls/matrix/AbstractMatrix.java    |   4 +-
 .../ignite/ml/math/impls/matrix/BlockEntry.java |  50 +++
 .../ml/math/impls/matrix/CacheMatrix.java       |   9 +-
 .../matrix/SparseBlockDistributedMatrix.java    | 208 +++++++++
 .../impls/matrix/SparseDistributedMatrix.java   |  26 +-
 .../storage/matrix/BaseBlockMatrixKey.java      |  41 ++
 .../impls/storage/matrix/BlockMatrixKey.java    | 144 ++++++
 .../storage/matrix/BlockMatrixStorage.java      | 435 +++++++++++++++++++
 .../vector/SparseLocalOnHeapVectorStorage.java  |   4 +-
 .../ignite/ml/math/statistics/Variance.java     |   1 +
 .../ignite/ml/math/statistics/package-info.java |  22 +
 .../org/apache/ignite/ml/math/util/MapUtil.java |   2 +-
 .../ignite/ml/math/util/package-info.java       |  22 +
 .../java/org/apache/ignite/ml/package-info.java |  22 +
 .../ml/math/MathImplDistributedTestSuite.java   |   2 +
 .../SparseDistributedBlockMatrixTest.java       | 379 ++++++++++++++++
 .../matrix/SparseDistributedMatrixTest.java     |  32 +-
 .../Apache.Ignite.Core.Tests.csproj             |   5 +
 .../Cache/Store/CacheStoreSessionTest.cs        |  90 ++--
 .../Store/CacheStoreSessionTestCodeConfig.cs    |  68 +++
 .../Store/CacheStoreSessionTestSharedFactory.cs |  48 ++
 .../Cache/Store/CacheStoreTest.cs               |  10 +-
 .../cache-store-session-shared-factory.xml      |  76 ++++
 .../Config/Cache/Store/cache-store-session.xml  |  20 +-
 .../Impl/Cache/Store/CacheStoreInternal.cs      |  14 +-
 modules/web-console/frontend/app/app.js         |  14 +
 .../frontend/app/data/pom-dependencies.json     |   5 +-
 .../configuration/generator/Maven.service.js    |   9 +-
 .../frontend/app/modules/states/errors.state.js |   6 +-
 .../frontend/app/modules/states/signin.state.js |  10 +-
 .../frontend/webpack/webpack.common.js          |   4 +-
 47 files changed, 2144 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/530ca724/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3674276,cdb4bb7..5d40084
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1226,21 -1109,22 +1226,19 @@@ public class GridDhtPartitionsExchangeF
          }
  
          if (cctx.kernalContext().clientNode()) {
--            msg = new GridDhtPartitionsSingleMessage(exchangeId(),
++            msg = newGridDhtPartitionsSingleMessage (exchangeId(),
                  true,
                  null,
                  true);
          }
          else {
-             msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
 -            msg = cctx.exchange().createPartitionsSingleMessage(node,
 -                exchangeId(),
--                false,
--                true);
 -        }
++            msg = cctx.exchange().createPartitionsSingleMessage(
++             exchangeId(), false, true);}
  
 -        Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 +            Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
  
-             if (partHistReserved0 != null)
-                 msg.partitionHistoryCounters(partHistReserved0);
-         }
+         if (partHistReserved0 != null)
 -            msg.partitionHistoryCounters(partHistReserved0);
++            msg.partitionHistoryCounters(partHistReserved0);}
  
          if (stateChangeExchange() && changeGlobalStateE != null)
              msg.setError(changeGlobalStateE);
@@@ -1646,29 -1371,12 +1644,28 @@@
       * @param node Sender node.
       * @param msg Single partition info.
       */
 -    public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
 +    public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
 +        assert !node.isDaemon() : node;
          assert msg != null;
-         assert exchId.equals(msg.exchangeId()) : msg;
-         assert !cctx.kernalContext().clientNode();
 -        assert msg.exchangeId().equals(exchId) : msg;
++        assert exchId.equals(msg.exchangeId()) : msg;assert !cctx.kernalContext().clientNode();
 +
 +        if (msg.restoreState()) {
 +            InitNewCoordinatorFuture newCrdFut0;
 +
 +            synchronized (this) {
 +                assert newCrdFut != null;
 +
 +                newCrdFut0 = newCrdFut;
 +            }
 +
 +            newCrdFut0.onMessage(node, msg);
 +
 +            return;
 +        }
 +
  
          if (!msg.client()) {
--            assert msg.lastVersion() != null : msg;
++if (!msg.client()) {        assert msg.lastVersion() != null : msg;
  
              updateLastVersion(msg.lastVersion());
          }


[02/16] ignite git commit: IGNITE-4728 Web Console: Saved last succeeded state and redirect to it on reload.

Posted by sb...@apache.org.
IGNITE-4728 Web Console: Saved last succeeded state and redirect to it on reload.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70d0f991
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70d0f991
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70d0f991

Branch: refs/heads/ignite-5578
Commit: 70d0f9918c708cb117e69163cc7b7c119c9a693c
Parents: 23f26af
Author: Dmitriy Shabalin <ds...@gridgain.com>
Authored: Thu Jul 20 15:08:20 2017 +0700
Committer: Andrey Novikov <an...@gridgain.com>
Committed: Thu Jul 20 15:08:20 2017 +0700

----------------------------------------------------------------------
 modules/web-console/frontend/app/app.js               | 14 ++++++++++++++
 .../frontend/app/modules/states/errors.state.js       |  6 ++++--
 .../frontend/app/modules/states/signin.state.js       | 10 +++++++++-
 3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/app.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/app.js b/modules/web-console/frontend/app/app.js
index c707810..dc5c6e9 100644
--- a/modules/web-console/frontend/app/app.js
+++ b/modules/web-console/frontend/app/app.js
@@ -289,6 +289,20 @@ angular
     $root.$on('$stateChangeStart', () => {
         _.forEach(angular.element('.modal'), (m) => angular.element(m).scope().$hide());
     });
+
+    if (!$root.IgniteDemoMode) {
+        $root.$on('$stateChangeSuccess', (event, {name, unsaved}, params) => {
+            try {
+                if (unsaved)
+                    localStorage.removeItem('lastStateChangeSuccess');
+                else
+                    localStorage.setItem('lastStateChangeSuccess', JSON.stringify({name, params}));
+            }
+            catch (ignored) {
+                // No-op.
+            }
+        });
+    }
 }])
 .run(['$rootScope', '$http', '$state', 'IgniteMessages', 'User', 'IgniteNotebookData',
     ($root, $http, $state, Messages, User, Notebook) => { // eslint-disable-line no-shadow

http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/modules/states/errors.state.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/states/errors.state.js b/modules/web-console/frontend/app/modules/states/errors.state.js
index e816ff8..e3d4d41 100644
--- a/modules/web-console/frontend/app/modules/states/errors.state.js
+++ b/modules/web-console/frontend/app/modules/states/errors.state.js
@@ -31,13 +31,15 @@ angular
                 templateUrl: templateNotFoundPage,
                 metaTags: {
                     title: 'Page not found'
-                }
+                },
+                unsaved: true
             })
             .state('403', {
                 url: '/403',
                 templateUrl: templateNotAuthorizedPage,
                 metaTags: {
                     title: 'Not authorized'
-                }
+                },
+                unsaved: true
             });
     }]);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70d0f991/modules/web-console/frontend/app/modules/states/signin.state.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/states/signin.state.js b/modules/web-console/frontend/app/modules/states/signin.state.js
index 5155bde..b7be51d 100644
--- a/modules/web-console/frontend/app/modules/states/signin.state.js
+++ b/modules/web-console/frontend/app/modules/states/signin.state.js
@@ -33,7 +33,15 @@ angular
         resolve: {
             user: ['$state', 'User', ($state, User) => {
                 return User.read()
-                    .then(() => $state.go('base.configuration.tabs'))
+                    .then(() => {
+                        try {
+                            const {name, params} = JSON.parse(localStorage.getItem('lastStateChangeSuccess'));
+
+                            $state.go(name, params);
+                        } catch (ignored) {
+                            $state.go('base.configuration.tabs');
+                        }
+                    })
                     .catch(() => {});
             }]
         },


[12/16] ignite git commit: IGNITE-5786 .NET: Fix cache store session handling for cross-cache transactions

Posted by sb...@apache.org.
IGNITE-5786 .NET: Fix cache store session handling for cross-cache transactions

This closes #2331


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1a3b374
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1a3b374
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1a3b374

Branch: refs/heads/ignite-5578
Commit: c1a3b3744f89e27906621e62e9d73281791fcf30
Parents: 6de0571
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 21 17:04:39 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 21 17:04:39 2017 +0300

----------------------------------------------------------------------
 .../dotnet/PlatformDotNetCacheStore.java        | 31 +++++++
 .../Apache.Ignite.Core.Tests.csproj             |  5 ++
 .../Cache/Store/CacheStoreSessionTest.cs        | 90 +++++++++++++-------
 .../Store/CacheStoreSessionTestCodeConfig.cs    | 68 +++++++++++++++
 .../Store/CacheStoreSessionTestSharedFactory.cs | 48 +++++++++++
 .../Cache/Store/CacheStoreTest.cs               | 10 ++-
 .../cache-store-session-shared-factory.xml      | 76 +++++++++++++++++
 .../Config/Cache/Store/cache-store-session.xml  | 20 ++---
 .../Impl/Cache/Store/CacheStoreInternal.cs      | 14 ++-
 9 files changed, 320 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index dd61a54..471eb01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -47,6 +47,7 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 /**
@@ -90,6 +91,9 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
     /** Key used to distinguish session deployment.  */
     private static final Object KEY_SES = new Object();
 
+    /** Key to designate a set of stores that share current session.  */
+    private static final Object KEY_SES_STORES = new Object();
+
     /** */
     @CacheStoreSessionResource
     private CacheStoreSession ses;
@@ -337,6 +341,23 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
                     writer.writeLong(session());
                     writer.writeString(ses.cacheName());
                     writer.writeBoolean(commit);
+
+                    // When multiple stores (caches) participate in a single transaction,
+                    // they share a single session, but sessionEnd is called on each store.
+                    // Same thing happens on platform side: session is shared; each store must be notified,
+                    // then session should be closed.
+                    Collection<Long> stores = (Collection<Long>) ses.properties().get(KEY_SES_STORES);
+                    assert stores != null;
+
+                    stores.remove(ptr);
+                    boolean last = stores.isEmpty();
+
+                    writer.writeBoolean(last);
+
+                    if (last) {
+                        // Session object has been released on platform side, remove marker.
+                        ses.properties().remove(KEY_SES);
+                    }
                 }
             }, null);
         }
@@ -415,6 +436,16 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
             ses.properties().put(KEY_SES, sesPtr);
         }
 
+        // Keep track of all stores that use current session (cross-cache tx uses single session for all caches).
+        Collection<Long> stores = (Collection<Long>) ses.properties().get(KEY_SES_STORES);
+
+        if (stores == null) {
+            stores = new HashSet<>();
+            ses.properties().put(KEY_SES_STORES, stores);
+        }
+
+        stores.add(ptr);
+
         return sesPtr;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 90b7970..e4f65bc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -89,6 +89,8 @@
     <Compile Include="Cache\Query\Linq\CacheLinqTest.Misc.cs" />
     <Compile Include="Cache\Query\Linq\CacheLinqTest.Custom.cs" />
     <Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" />
+    <Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
+    <Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
     <Compile Include="Deployment\CacheGetFunc.cs" />
     <Compile Include="Deployment\GetAddressFunc.cs" />
     <Compile Include="Deployment\PeerAssemblyLoadingAllApisTest.cs" />
@@ -314,6 +316,9 @@
     <Content Include="Config\Cache\Affinity\affinity-function.xml">
       <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
     </Content>
+    <Content Include="Config\Cache\Store\cache-store-session-shared-factory.xml">
+      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+    </Content>
     <Content Include="Config\Cache\Store\cache-store-session.xml">
       <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
     </Content>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
index 315e285..818948c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -28,16 +28,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
     /// <summary>
     /// Tests for store session.
     /// </summary>
-    public sealed class CacheStoreSessionTest
+    public class CacheStoreSessionTest
     {
-        /** Grid name. */
-        private const string IgniteName = "grid";
-
         /** Cache 1 name. */
-        private const string Cache1 = "cache1";
+        protected const string Cache1 = "cache1";
 
         /** Cache 2 name. */
-        private const string Cache2 = "cache2";
+        protected const string Cache2 = "cache2";
 
         /** Operations. */
         private static ConcurrentBag<ICollection<Operation>> _dumps;
@@ -48,11 +45,26 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
         [TestFixtureSetUp]
         public void BeforeTests()
         {
-            Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            Ignition.Start(GetIgniteConfiguration());
+        }
+
+        /// <summary>
+        /// Gets the ignite configuration.
+        /// </summary>
+        protected virtual IgniteConfiguration GetIgniteConfiguration()
+        {
+            return new IgniteConfiguration(TestUtils.GetTestConfiguration())
             {
-                IgniteInstanceName = IgniteName,
                 SpringConfigUrl = @"config\cache\store\cache-store-session.xml"
-            });
+            };
+        }
+
+        /// <summary>
+        /// Gets the store count.
+        /// </summary>
+        protected virtual int StoreCount
+        {
+            get { return 2; }
         }
 
         /// <summary>
@@ -61,21 +73,29 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
         [TestFixtureTearDown]
         public void AfterTests()
         {
-            Ignition.StopAll(true);
+            try
+            {
+                TestUtils.AssertHandleRegistryHasItems(Ignition.GetIgnite(), 2, 1000);
+            }
+            finally 
+            {
+                Ignition.StopAll(true);
+            }
         }
         
         /// <summary>
         /// Test basic session API.
         /// </summary>
         [Test]
+        [Timeout(30000)]
         public void TestSession()
         {
             _dumps = new ConcurrentBag<ICollection<Operation>>();
 
-            var ignite = Ignition.GetIgnite(IgniteName);
+            var ignite = Ignition.GetIgnite();
 
-            var cache1 = Ignition.GetIgnite(IgniteName).GetCache<int, int>(Cache1);
-            var cache2 = Ignition.GetIgnite(IgniteName).GetCache<int, int>(Cache2);
+            var cache1 = ignite.GetCache<int, int>(Cache1);
+            var cache2 = ignite.GetCache<int, int>(Cache2);
 
             // 1. Test rollback.
             using (var tx = ignite.GetTransactions().TxStart())
@@ -86,11 +106,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
                 tx.Rollback();
             }
 
-            Assert.AreEqual(1, _dumps.Count);
-            var ops = _dumps.First();
-            Assert.AreEqual(1, ops.Count);
+            // SessionEnd is called once per store instance.
+            Assert.AreEqual(StoreCount, _dumps.Count);
 
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && !op.Commit));
+            foreach (var ops in _dumps)
+            {
+                var op = ops.Single();
+                Assert.AreEqual(OperationType.SesEnd, op.Type);
+                Assert.IsFalse(op.Commit);
+            }
 
             _dumps = new ConcurrentBag<ICollection<Operation>>();
 
@@ -103,13 +127,17 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
                 tx.Commit();
             }
 
-            Assert.AreEqual(1, _dumps.Count);
-            ops = _dumps.First();
-            Assert.AreEqual(3, ops.Count);
+            Assert.AreEqual(StoreCount, _dumps.Count);
 
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache1.Equals(op.CacheName) && 1.Equals(op.Key) && 1.Equals(op.Value)));
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache2.Equals(op.CacheName) && 2.Equals(op.Key) && 2.Equals(op.Value)));
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+            foreach (var ops in _dumps)
+            {
+                Assert.AreEqual(2 + StoreCount, ops.Count);
+                Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write
+                                                   && Cache1 == op.CacheName && 1 == op.Key && 1 == op.Value));
+                Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write
+                                                   && Cache2 == op.CacheName && 2 == op.Key && 2 == op.Value));
+                Assert.AreEqual(StoreCount, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+            }
 
             _dumps = new ConcurrentBag<ICollection<Operation>>();
 
@@ -122,13 +150,17 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
                 tx.Commit();
             }
 
-            Assert.AreEqual(1, _dumps.Count);
-            ops = _dumps.First();
-            Assert.AreEqual(3, ops.Count);
+            Assert.AreEqual(StoreCount, _dumps.Count);
+            foreach (var ops in _dumps)
+            {
+                Assert.AreEqual(2 + StoreCount, ops.Count);
 
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache1.Equals(op.CacheName) && 1.Equals(op.Key)));
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache2.Equals(op.CacheName) && 2.Equals(op.Key)));
-            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+                Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete
+                                                   && Cache1 == op.CacheName && 1 == op.Key));
+                Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete
+                                                   && Cache2 == op.CacheName && 2 == op.Key));
+                Assert.AreEqual(StoreCount, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
new file mode 100644
index 0000000..0b5f474
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestCodeConfig.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Common;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests store session with programmatic configuration (uses different store factory on Java side).
+    /// </summary>
+    [TestFixture]
+    public class CacheStoreSessionTestCodeConfig : CacheStoreSessionTest
+    {
+        /** <inheritdoc /> */
+        protected override IgniteConfiguration GetIgniteConfiguration()
+        {
+            return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                CacheConfiguration = new[]
+                {
+                    new CacheConfiguration(Cache1)
+                    {
+                        AtomicityMode = CacheAtomicityMode.Transactional,
+                        ReadThrough = true,
+                        WriteThrough = true,
+                        CacheStoreFactory = new StoreFactory()
+                    },
+                    new CacheConfiguration(Cache2)
+                    {
+                        AtomicityMode = CacheAtomicityMode.Transactional,
+                        ReadThrough = true,
+                        WriteThrough = true,
+                        CacheStoreFactory = new StoreFactory()
+                    }
+                }
+            };
+        }
+
+        /// <summary>
+        /// Store factory.
+        /// </summary>
+        private class StoreFactory : IFactory<ICacheStore>
+        {
+            /** <inheritdoc /> */
+            public ICacheStore CreateInstance()
+            {
+                return new Store();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
new file mode 100644
index 0000000..2af5915
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTestSharedFactory.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Session test with shared PlatformDotNetCacheStoreFactory,
+    /// which causes the same store insance to be used for both caches.
+    /// </summary>
+    [TestFixture]
+    public class CacheStoreSessionTestSharedFactory : CacheStoreSessionTest
+    {
+        /** <inheritdoc /> */
+        protected override IgniteConfiguration GetIgniteConfiguration()
+        {
+            return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                SpringConfigUrl = @"config\cache\store\cache-store-session-shared-factory.xml"
+            };
+        }
+
+        /** <inheritdoc /> */
+        protected override int StoreCount
+        {
+            get
+            {
+                // Shared PlatformDotNetCacheStoreFactory results in a single store instance.
+                return 1;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index e05f4bd..d3e4ab6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -64,7 +64,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
         [TestFixtureTearDown]
         public void AfterTests()
         {
-            Ignition.StopAll(true);
+            try
+            {
+                // 3 stores are expected in HandleRegistry.
+                TestUtils.AssertHandleRegistryHasItems(Ignition.GetIgnite(), 3, 1000);
+            }
+            finally
+            {
+                Ignition.StopAll(true);
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
new file mode 100644
index 0000000..05515c4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session-shared-factory.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="storeFactory" class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+        <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+    </bean>
+  
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="localHost" value="127.0.0.1"/>
+        <property name="connectorConfiguration"><null/></property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="cache1"/>
+                    <property name="cacheMode" value="LOCAL"/>
+                    <property name="atomicityMode" value="TRANSACTIONAL"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="readThrough" value="true"/>
+
+                    <property name="cacheStoreFactory" ref="storeFactory" />
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="cache2"/>
+                    <property name="cacheMode" value="LOCAL"/>
+                    <property name="atomicityMode" value="TRANSACTIONAL"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="readThrough" value="true"/>
+
+                    <property name="cacheStoreFactory" ref="storeFactory" />
+                </bean>
+            </list>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+                <property name="socketTimeout" value="300" />
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
index 3cc9efa..14dc78e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Store/cache-store-session.xml
@@ -25,18 +25,10 @@
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd">
-    <bean id="storeFactory" class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
-        <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
-    </bean>
-  
     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="localHost" value="127.0.0.1"/>
         <property name="connectorConfiguration"><null/></property>
 
-        <property name="includeEventTypes">
-            <util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
-        </property>
-      
         <property name="cacheConfiguration">
             <list>
                 <bean class="org.apache.ignite.configuration.CacheConfiguration">
@@ -46,7 +38,11 @@
                     <property name="writeThrough" value="true"/>
                     <property name="readThrough" value="true"/>
 
-                    <property name="cacheStoreFactory" ref="storeFactory" />
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+                            <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+                        </bean>
+                    </property>
                 </bean>
 
                 <bean class="org.apache.ignite.configuration.CacheConfiguration">
@@ -56,7 +52,11 @@
                     <property name="writeThrough" value="true"/>
                     <property name="readThrough" value="true"/>
 
-                    <property name="cacheStoreFactory" ref="storeFactory" />
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactory">
+                            <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Store.CacheStoreSessionTest+Store, Apache.Ignite.Core.Tests"/>
+                        </bean>
+                    </property>
                 </bean>
             </list>
         </property>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1a3b374/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
index f147579..df4c1ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreInternal.cs
@@ -111,6 +111,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
 
             CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true);
 
+            // Session cache name may change in cross-cache transaction.
+            // Single session is used for all stores in cross-cache transactions.
             ses.CacheName = rawReader.ReadString();
 
             _sesProxy.SetSession(ses);
@@ -223,11 +225,19 @@ namespace Apache.Ignite.Core.Impl.Cache.Store
                         break;
 
                     case OpSesEnd:
-                        grid.HandleRegistry.Release(sesId);
+                    {
+                        var commit = rawReader.ReadBoolean();
+                        var last = rawReader.ReadBoolean();
 
-                        _store.SessionEnd(rawReader.ReadBoolean());
+                        if (last)
+                        {
+                            grid.HandleRegistry.Release(sesId);
+                        }
+
+                        _store.SessionEnd(commit);
 
                         break;
+                    }
 
                     default:
                         throw new IgniteException("Invalid operation type: " + opType);