You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/04/11 12:28:23 UTC

[01/10] ignite git commit: IGNITE-8201 WIP.

Repository: ignite
Updated Branches:
  refs/heads/ignite-8201 df938d562 -> 3780ac0a0


IGNITE-8201 WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86d0bcb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86d0bcb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86d0bcb7

Branch: refs/heads/ignite-8201
Commit: 86d0bcb7d134686fb521dd7da5dc0e583d7f917c
Parents: df938d5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 15:19:58 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 15:19:58 2018 +0700

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     | 21 ++++++++++----------
 ...ettyRestProcessorAuthenticationSelfTest.java |  2 +-
 .../processors/rest/GridRestProcessor.java      |  2 +-
 3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 5201b33..38f22ba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -294,8 +294,9 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
     /**
      * @param content Content to check.
+     * @return JSON node with actual response.
      */
-    private JsonNode jsonCacheOperationResponse(String content, boolean bulk) throws IOException {
+    protected JsonNode assertResponseSucceeded(String content, boolean bulk) throws IOException {
         assertNotNull(content);
         assertFalse(content.isEmpty());
 
@@ -315,7 +316,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @param res Response.
      */
     private void assertCacheOperation(String content, Object res) throws IOException {
-        JsonNode ret = jsonCacheOperationResponse(content, false);
+        JsonNode ret = assertResponseSucceeded(content, false);
 
         assertEquals(String.valueOf(res), ret.isObject() ? ret.toString() : ret.asText());
     }
@@ -325,7 +326,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @param res Response.
      */
     private void assertCacheBulkOperation(String content, Object res) throws IOException {
-        JsonNode ret = jsonCacheOperationResponse(content, true);
+        JsonNode ret = assertResponseSucceeded(content, true);
 
         assertEquals(String.valueOf(res), ret.asText());
     }
@@ -334,7 +335,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @param content Content to check.
      */
     private void assertCacheMetrics(String content) throws IOException {
-        JsonNode ret = jsonCacheOperationResponse(content, true);
+        JsonNode ret = assertResponseSucceeded(content, true);
 
         assertTrue(ret.isObject());
     }
@@ -403,7 +404,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @throws IOException If failed.
      */
     private void checkJson(String json, Person p) throws IOException {
-        JsonNode res = jsonCacheOperationResponse(json, false);
+        JsonNode res = assertResponseSucceeded(json, false);
 
         assertEquals(p.id.intValue(), res.get("id").asInt());
         assertEquals(p.getOrganizationId().intValue(), res.get("orgId").asInt());
@@ -455,7 +456,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         info("Get command result: " + ret);
 
-        JsonNode res = jsonCacheOperationResponse(ret, false);
+        JsonNode res = assertResponseSucceeded(ret, false);
 
         assertEquals("Alex", res.get("NAME").asText());
         assertEquals(300, res.get("SALARY").asInt());
@@ -476,7 +477,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         info("Get command result: " + ret);
 
-        JsonNode json = jsonCacheOperationResponse(ret, false);
+        JsonNode json = assertResponseSucceeded(ret, false);
         assertEquals(ref1.name, json.get("name").asText());
 
         ref2.ref(ref1);
@@ -552,7 +553,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         info("Get command result: " + ret);
 
-        JsonNode res = jsonCacheOperationResponse(ret, false);
+        JsonNode res = assertResponseSucceeded(ret, false);
 
         assertEquals(p.id, res.get("id").asInt());
         assertEquals(p.name, res.get("name").asText());
@@ -637,7 +638,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         info("Get command result: " + ret);
 
-        JsonNode res = jsonCacheOperationResponse(ret, false);
+        JsonNode res = assertResponseSucceeded(ret, false);
 
         assertEquals(t.getKey(), res.get("key").asText());
         assertEquals(t.getValue(), res.get("value").asText());
@@ -775,7 +776,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         info("Get all command result: " + ret);
 
-        JsonNode res = jsonCacheOperationResponse(ret, true);
+        JsonNode res = assertResponseSucceeded(ret, true);
 
         assertTrue(res.isObject());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
index 68a96da..145d7e9 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
@@ -134,7 +134,7 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
 
         String ret = content(null, GridRestCommand.VERSION);
 
-        assertResponseContainsError(ret, "The user name or password is incorrect");
+        assertResponseSucceeded(ret, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index a823be0..6ca1351 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -342,7 +342,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
 
                 assert res != null;
 
-                if (ctx.security().enabled() && !failed)
+                if ((ctx.authentication().enabled() || ctx.security().enabled()) && !failed)
                     res.sessionTokenBytes(req.sessionToken());
 
                 interceptResponse(res, req);


[09/10] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-8201

Posted by ak...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-8201


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe21ed57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe21ed57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe21ed57

Branch: refs/heads/ignite-8201
Commit: fe21ed5710396c6b2cb7b9cbb1f8f115e697dc49
Parents: 86d0bcb 74d2545
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 17:39:20 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 17:39:20 2018 +0700

----------------------------------------------------------------------
 .../configuration/DataStorageConfiguration.java |   6 +-
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../discovery/GridDiscoveryManager.java         |  10 +
 .../MetaPageUpdatePartitionDataRecord.java      |   2 +-
 .../processors/cache/CacheMetricsImpl.java      |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  38 +
 .../GridCachePartitionExchangeManager.java      |  17 +
 .../cache/GridCacheSharedContext.java           |   9 +-
 .../processors/cache/GridCacheUtils.java        |   2 +-
 .../cache/IgniteCacheOffheapManager.java        |   8 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  10 +-
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../distributed/dht/GridDhtLocalPartition.java  |   9 +-
 .../dht/GridDhtPartitionTopology.java           |   6 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  26 +-
 .../dht/GridDhtPartitionsStateValidator.java    | 255 +++++++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |  96 ++-
 .../GridDhtPartitionsSingleMessage.java         |  68 +-
 .../dht/preloader/InitNewCoordinatorFuture.java |   2 +-
 .../preloader/latch/ExchangeLatchManager.java   | 695 +++++++++++++++++++
 .../distributed/dht/preloader/latch/Latch.java  |  52 ++
 .../dht/preloader/latch/LatchAckMessage.java    | 165 +++++
 .../cache/distributed/near/GridNearTxLocal.java |  10 +
 .../persistence/GridCacheOffheapManager.java    |  10 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxManager.java     |  36 +-
 ...cheDhtLocalPartitionAfterRemoveSelfTest.java |   2 +-
 .../DataStorageConfigurationValidationTest.java |  33 +-
 .../processors/cache/IgniteCacheGroupsTest.java |   1 +
 ...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
 .../GridCachePartitionsStateValidationTest.java | 316 +++++++++
 ...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
 .../db/wal/IgniteWalFlushFailoverTest.java      |   4 +-
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |   4 +-
 .../TxOptimisticOnPartitionExchangeTest.java    | 322 +++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +
 .../testsuites/IgniteCacheTestSuite6.java       |   6 +
 .../java/org/apache/ignite/ml/knn/KNNUtils.java |  59 ++
 .../KNNClassificationTrainer.java               |  23 +-
 .../ml/knn/regression/KNNRegressionModel.java   |  87 +++
 .../ml/knn/regression/KNNRegressionTrainer.java |  40 ++
 .../ignite/ml/knn/regression/package-info.java  |  22 +
 .../apache/ignite/ml/knn/KNNRegressionTest.java | 143 ++++
 .../org/apache/ignite/ml/knn/KNNTestSuite.java  |   1 +
 .../ApiParity/IgniteConfigurationParityTest.cs  |   3 +-
 modules/web-console/frontend/app/app.config.js  |  14 +-
 .../components/modal-import-models/component.js |   4 +-
 .../app/components/page-profile/controller.js   |   4 +-
 .../frontend/app/modules/ace.module.js          |  47 +-
 .../services/AngularStrapSelect.decorator.js    |   5 +-
 .../services/AngularStrapTooltip.decorator.js   |   8 +-
 .../frontend/app/services/FormUtils.service.js  |   3 +-
 parent/pom.xml                                  |   4 +
 55 files changed, 3012 insertions(+), 106 deletions(-)
----------------------------------------------------------------------



[10/10] ignite git commit: IGNITE-8201 WIP.

Posted by ak...@apache.org.
IGNITE-8201 WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3780ac0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3780ac0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3780ac0a

Branch: refs/heads/ignite-8201
Commit: 3780ac0a09198e2918206d23ee515a2fe522352c
Parents: fe21ed5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 19:28:11 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 19:28:11 2018 +0700

----------------------------------------------------------------------
 ...ettyRestProcessorAuthenticationSelfTest.java | 43 ++++++++++----------
 .../http/jetty/GridJettyRestHandler.java        |  8 +++-
 2 files changed, 28 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3780ac0a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
index 145d7e9..1536d9d 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
@@ -41,12 +41,6 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
     private static final String DFLT_PWD = "ignite";
 
     /** */
-    private String user = DFLT_USER;
-
-    /** */
-    private String pwd = DFLT_PWD;
-
-    /** */
     private String tok = "";
 
     /** {@inheritDoc} */
@@ -60,8 +54,22 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        user = DFLT_USER;
-        pwd = DFLT_PWD;
+        // Authenticate and extract token.
+        if (F.isEmpty(tok)) {
+            String ret = content(null, GridRestCommand.AUTHENTICATE,
+                "user", DFLT_USER,
+                "password", DFLT_PWD);
+
+            int p1 = ret.indexOf("sessionToken");
+            int p2 = ret.indexOf('"', p1 + 16);
+
+            tok = ret.substring(p1 + 15, p2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean securityEnabled() {
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -104,14 +112,8 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
     @Override protected String restUrl() {
         String url = super.restUrl();
 
-        if (!F.isEmpty(user)) {
-            url += "user=" + user;
-
-            if (!F.isEmpty(pwd))
-                url += "&password=" + pwd;
-
-            url += '&';
-        }
+        if (!F.isEmpty(tok))
+            url += "sessionToken=" + tok + "&";
 
         return url;
     }
@@ -122,19 +124,18 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
     public void testAuthenticationCommand() throws Exception {
         String ret = content(null, GridRestCommand.AUTHENTICATE);
 
-        assertResponseContainsError(ret, "The user name or password is incorrect");
+        assertResponseSucceeded(ret, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testMissingCredentials() throws Exception {
-        user = null;
-        pwd = null;
+    public void testMissingSessionToken() throws Exception {
+        tok = null;
 
         String ret = content(null, GridRestCommand.VERSION);
 
-        assertResponseSucceeded(ret, false);
+        assertResponseContainsError(ret, "The user name or password is incorrect");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3780ac0a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index e70ef6a..7f20dca 100644
--- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -69,6 +69,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.client.GridClientCacheFlag.KEEP_BINARIES_MASK;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.AUTHENTICATE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CONTAINS_KEYS;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_ALL;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PUT_ALL;
@@ -852,8 +853,11 @@ public class GridJettyRestHandler extends AbstractHandler {
 
         restReq.command(cmd);
 
-        if (!credentials(params, USER_PARAM, PWD_PARAM, restReq))
-            credentials(params, IGNITE_LOGIN_PARAM, IGNITE_PASSWORD_PARAM, restReq);
+        // Check credentials only for AUTHENTICATE command.
+        if (cmd == AUTHENTICATE) {
+            if (!credentials(params, USER_PARAM, PWD_PARAM, restReq))
+                credentials(params, IGNITE_LOGIN_PARAM, IGNITE_PASSWORD_PARAM, restReq);
+        }
 
         String clientId = (String)params.get("clientId");
 


[02/10] ignite git commit: IGNITE-7871 Implemented additional synchronization phase for correct partition counters update

Posted by ak...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
new file mode 100644
index 0000000..bad1b61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message is used to send acks for {@link Latch} instances management.
+ */
+public class LatchAckMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Latch id. */
+    private String latchId;
+
+    /** Latch topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Flag indicates that ack is final. */
+    private boolean isFinal;
+
+    /**
+     * Constructor.
+     *
+     * @param latchId Latch id.
+     * @param topVer Latch topology version.
+     * @param isFinal Final acknowledgement flag.
+     */
+    public LatchAckMessage(String latchId, AffinityTopologyVersion topVer, boolean isFinal) {
+        this.latchId = latchId;
+        this.topVer = topVer;
+        this.isFinal = isFinal;
+    }
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public LatchAckMessage() {
+    }
+
+    /**
+     * @return Latch id.
+     */
+    public String latchId() {
+        return latchId;
+    }
+
+    /**
+     * @return Latch topology version.
+     */
+    public AffinityTopologyVersion topVer() {
+        return topVer;
+    }
+
+    /**
+     * @return {@code} if ack is final.
+     */
+    public boolean isFinal() {
+        return isFinal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeBoolean("isFinal", isFinal))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeString("latchId", latchId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                isFinal = reader.readBoolean("isFinal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                latchId = reader.readString("latchId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(LatchAckMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 135;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7785605..33f84f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3525,6 +3525,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
                 U.error(log, "Failed to prepare transaction: " + this, e);
             }
+            catch (Throwable t) {
+                fut.onDone(t);
+
+                throw t;
+            }
 
             if (err != null)
                 fut.rollbackOnError(err);
@@ -3544,6 +3549,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
                         U.error(log, "Failed to prepare transaction: " + this, e);
                     }
+                    catch (Throwable t) {
+                        fut.onDone(t);
+
+                        throw t;
+                    }
 
                     if (err != null)
                         fut.rollbackOnError(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5cfd92d..68ec83d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -189,7 +189,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             freeList.saveMetadata();
 
             long updCntr = store.updateCounter();
-            int size = store.fullSize();
+            long size = store.fullSize();
             long rmvId = globalRemoveId().get();
 
             PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
@@ -318,7 +318,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                                 partMetaId,
                                 updCntr,
                                 rmvId,
-                                size,
+                                (int)size, // TODO: Partition size may be long
                                 cntrsPageId,
                                 state == null ? -1 : (byte)state.ordinal(),
                                 pageCnt
@@ -549,7 +549,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             final int grpId,
             final int partId,
             final int currAllocatedPageCnt,
-            final int partSize
+            final long partSize
     ) {
         if (part != null) {
             boolean reserved = part.reserve();
@@ -1301,7 +1301,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public int fullSize() {
+        @Override public long fullSize() {
             try {
                 CacheDataStore delegate0 = init0(true);
 
@@ -1313,7 +1313,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public int cacheSize(int cacheId) {
+        @Override public long cacheSize(int cacheId) {
             try {
                 CacheDataStore delegate0 = init0(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9bfaaf3..945ef48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -490,7 +490,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     @Override public AffinityTopologyVersion topologyVersion() {
         AffinityTopologyVersion res = topVer;
 
-        if (res.equals(AffinityTopologyVersion.NONE)) {
+        if (res == null || res.equals(AffinityTopologyVersion.NONE)) {
             if (system()) {
                 AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fbdeca1..9fb8777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -545,10 +545,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param topVer Topology version.
      * @return Future that will be completed when all ongoing transactions are finished.
      */
-    public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) {
+    public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer) {
         GridCompoundFuture<IgniteInternalTx, Boolean> res =
             new CacheObjectsReleaseFuture<>(
-                "Tx",
+                "LocalTx",
                 topVer,
                 new IgniteReducer<IgniteInternalTx, Boolean>() {
                     @Override public boolean collect(IgniteInternalTx e) {
@@ -561,8 +561,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 });
 
         for (IgniteInternalTx tx : txs()) {
-            if (needWaitTransaction(tx, topVer))
+            if (needWaitTransaction(tx, topVer)) {
                 res.add(tx.finishFuture());
+            }
         }
 
         res.markInitialized();
@@ -571,6 +572,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Creates a future that will wait for finishing all tx updates on backups after all local transactions are finished.
+     *
+     * NOTE:
+     * As we send finish request to backup nodes after transaction successfully completed on primary node
+     * it's important to ensure that all updates from primary to backup are finished or at least remote transaction has created on backup node.
+     *
+     * @param finishLocalTxsFuture Local transactions finish future.
+     * @param topVer Topology version.
+     * @return Future that will be completed when all ongoing transactions are finished.
+     */
+    public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> finishLocalTxsFuture, AffinityTopologyVersion topVer) {
+        final GridCompoundFuture finishAllTxsFuture = new CacheObjectsReleaseFuture("AllTx", topVer);
+
+        // After finishing all local updates, wait for finishing all tx updates on backups.
+        finishLocalTxsFuture.listen(future -> {
+            finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer));
+            finishAllTxsFuture.markInitialized();
+        });
+
+        return finishAllTxsFuture;
+    }
+
+    /**
      * @param tx Transaction.
      * @param topVer Exchange version.
      * @return {@code True} if need wait transaction for exchange.
@@ -1834,12 +1858,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return Finish future for related remote transactions.
      */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
-        GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+    public IgniteInternalFuture<IgniteInternalTx> remoteTxFinishFuture(GridCacheVersion nearVer) {
+        GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>();
 
         for (final IgniteInternalTx tx : txs()) {
             if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
-                fut.add((IgniteInternalFuture) tx.finishFuture());
+                fut.add(tx.finishFuture());
         }
 
         fut.markInitialized();

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
index 7263656..702b188 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
@@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac
             cache = grid(g).cache(DEFAULT_CACHE_NAME);
 
             for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) {
-                int size = p.dataStore().fullSize();
+                long size = p.dataStore().fullSize();
 
                 assertTrue("Unexpected size: " + size, size <= 32);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 468bbc8..6c570d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
new file mode 100644
index 0000000..52cd033
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Lists;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed.
+ */
+public class IgniteExchangeLatchManagerCoordinatorFailTest extends GridCommonAbstractTest {
+    /** */
+    private static final String LATCH_NAME = "test";
+
+    /** 5 nodes. */
+    private final AffinityTopologyVersion latchTopVer = new AffinityTopologyVersion(5, 0);
+
+    /** Wait before latch creation. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCreate = (mgr, syncLatch) -> {
+        try {
+            syncLatch.countDown();
+            syncLatch.await();
+
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            distributedLatch.countDown();
+
+            distributedLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** Wait before latch count down. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCountDown = (mgr, syncLatch) -> {
+        try {
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            syncLatch.countDown();
+            syncLatch.await();
+
+            distributedLatch.countDown();
+
+            distributedLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception ", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** Wait after all operations are successful. */
+    private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> all = (mgr, syncLatch) -> {
+        try {
+            Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+            distributedLatch.countDown();
+
+            syncLatch.countDown();
+
+            distributedLatch.await();
+
+            syncLatch.await();
+        } catch (Exception e) {
+            log.error("Unexpected exception ", e);
+
+            return false;
+        }
+
+        return true;
+    };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test scenarios description:
+     *
+     * We have existing coordinator and 4 other nodes.
+     * Each node do following operations:
+     * 1) Create latch
+     * 2) Countdown latch
+     * 3) Await latch
+     *
+     * While nodes do the operations we shutdown coordinator and next oldest node become new coordinator.
+     * We should check that new coordinator properly restored latch and all nodes finished latch completion successfully after that.
+     *
+     * Each node before coordinator shutdown can be in 3 different states:
+     *
+     * State {@link #beforeCreate} - Node didn't create latch yet.
+     * State {@link #beforeCountDown} - Node created latch but didn't count down it yet.
+     * State {@link #all} - Node created latch and count downed it.
+     *
+     * We should check important cases when future coordinator is in one of these states, and other 3 nodes have 3 different states.
+     */
+
+    /**
+     * Scenario 1:
+     *
+     * Node 1 state -> {@link #beforeCreate}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail1() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+            beforeCreate,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Scenario 2:
+     *
+     * Node 1 state -> {@link #beforeCountDown}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail2() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+            beforeCountDown,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Scenario 3:
+     *
+     * Node 1 state -> {@link #all}
+     * Node 2 state -> {@link #beforeCountDown}
+     * Node 3 state -> {@link #all}
+     * Node 4 state -> {@link #beforeCreate}
+     */
+    public void testCoordinatorFail3() throws Exception {
+        List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+            all,
+            beforeCountDown,
+            all,
+            beforeCreate
+        );
+
+        doTestCoordinatorFail(nodeStates);
+    }
+
+    /**
+     * Test latch coordinator fail with specified scenarios.
+     *
+     * @param nodeScenarios Node scenarios.
+     * @throws Exception If failed.
+     */
+    private void doTestCoordinatorFail(List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeScenarios) throws Exception {
+        IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5);
+        crd.cluster().active(true);
+
+        // Latch to synchronize node states.
+        CountDownLatch syncLatch = new CountDownLatch(5);
+
+        GridCompoundFuture finishAllLatches = new GridCompoundFuture();
+
+        AtomicBoolean hasErrors = new AtomicBoolean();
+
+        for (int node = 1; node < 5; node++) {
+            IgniteEx grid = grid(node);
+            ExchangeLatchManager latchMgr = grid.context().cache().context().exchange().latch();
+            final int stateIdx = node - 1;
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+                boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, syncLatch);
+                if (!success)
+                    hasErrors.set(true);
+            }, 1, "latch-runner-" + node);
+
+            finishAllLatches.add(fut);
+        }
+
+        finishAllLatches.markInitialized();
+
+        // Wait while all nodes reaches their states.
+        while (syncLatch.getCount() != 1) {
+            Thread.sleep(10);
+
+            if (hasErrors.get())
+                throw new Exception("All nodes should complete latches without errors");
+        }
+
+        crd.close();
+
+        // Resume progress for all nodes.
+        syncLatch.countDown();
+
+        // Wait for distributed latch completion.
+        finishAllLatches.get(5000);
+
+        Assert.assertFalse("All nodes should complete latches without errors", hasErrors.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
new file mode 100644
index 0000000..63d772a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class GridCachePartitionsStateValidationTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        cfg.setCommunicationSpi(new SingleMessageInterceptorCommunicationSpi(2));
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        clientMode = false;
+    }
+
+    /**
+     * Test that partitions state validation works correctly.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValidationIfPartitionCountersAreInconsistent() throws Exception {
+        IgniteEx ignite = (IgniteEx) startGrids(2);
+        ignite.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        // Modify update counter for some partition.
+        for (GridDhtLocalPartition partition : ignite.cachex(CACHE_NAME).context().topology().localPartitions()) {
+            partition.updateCounter(100500L);
+            break;
+        }
+
+        // Trigger exchange.
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        // Nothing should happen (just log error message) and we're still able to put data to corrupted cache.
+        ignite.cache(CACHE_NAME).put(0, 0);
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that all nodes send correct {@link GridDhtPartitionsSingleMessage} with consistent update counters.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionCountersConsistencyOnExchange() throws Exception {
+        IgniteEx ignite = (IgniteEx) startGrids(4);
+        ignite.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        final String atomicCacheName = "atomic-cache";
+        final String txCacheName = "tx-cache";
+
+        clientMode = true;
+
+        Ignite client = startGrid(4);
+
+        clientMode = false;
+
+        IgniteCache atomicCache = client.getOrCreateCache(new CacheConfiguration<>(atomicCacheName)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        IgniteCache txCache = client.getOrCreateCache(new CacheConfiguration<>(txCacheName)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        for (int it = 0; it < 10; it++) {
+            SingleMessageInterceptorCommunicationSpi spi = (SingleMessageInterceptorCommunicationSpi) ignite.configuration().getCommunicationSpi();
+            spi.clear();
+
+            // Stop load future.
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            // Run atomic load.
+            IgniteInternalFuture atomicLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+                int k = 0;
+
+                while (!stop.get()) {
+                    k++;
+                    try {
+                        atomicCache.put(k, k);
+                    } catch (Exception ignored) {}
+                }
+            }, 1, "atomic-load");
+
+            // Run tx load.
+            IgniteInternalFuture txLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+                final int txOps = 5;
+
+                while (!stop.get()) {
+                    List<Integer> randomKeys = Stream.generate(() -> ThreadLocalRandom.current().nextInt(5))
+                        .limit(txOps)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+                    try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
+                        for (Integer key : randomKeys)
+                            txCache.put(key, key);
+
+                        tx.commit();
+                    }
+                    catch (Exception ignored) { }
+                }
+            }, 4, "tx-load");
+
+            // Wait for some data.
+            Thread.sleep(1000);
+
+            // Prevent sending full message.
+            spi.blockFullMessage();
+
+            // Trigger exchange.
+            IgniteInternalFuture nodeStopFuture = GridTestUtils.runAsync(() -> stopGrid(3));
+
+            try {
+                spi.waitUntilAllSingleMessagesAreSent();
+
+                List<GridDhtPartitionsSingleMessage> interceptedMessages = spi.getMessages();
+
+                // Associate each message with existing node UUID.
+                Map<UUID, GridDhtPartitionsSingleMessage> messagesMap = new HashMap<>();
+                for (int i = 0; i < interceptedMessages.size(); i++)
+                    messagesMap.put(grid(i + 1).context().localNodeId(), interceptedMessages.get(i));
+
+                GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(ignite.context().cache().context());
+
+                // Validate partition update counters. If counters are not consistent, exception will be thrown.
+                validator.validatePartitionsUpdateCounters(ignite.cachex(atomicCacheName).context().topology(), messagesMap, Collections.emptySet());
+                validator.validatePartitionsUpdateCounters(ignite.cachex(txCacheName).context().topology(), messagesMap, Collections.emptySet());
+
+            } finally {
+                // Stop load and resume exchange.
+                spi.unblockFullMessage();
+
+                stop.set(true);
+
+                atomicLoadFuture.get();
+                txLoadFuture.get();
+                nodeStopFuture.get();
+            }
+
+            // Return grid to initial state.
+            startGrid(3);
+
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * SPI which intercepts single messages during exchange.
+     */
+    private static class SingleMessageInterceptorCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private static final List<GridDhtPartitionsSingleMessage> messages = new CopyOnWriteArrayList<>();
+
+        /** Future completes when {@link #singleMessagesThreshold} messages are sent to coordinator. */
+        private static final GridFutureAdapter allSingleMessagesSent = new GridFutureAdapter();
+
+        /** A number of single messages we're waiting for send. */
+        private final int singleMessagesThreshold;
+
+        /** Latch which blocks full message sending. */
+        private volatile CountDownLatch blockFullMsgLatch;
+
+        /**
+         * Constructor.
+         */
+        private SingleMessageInterceptorCommunicationSpi(int singleMessagesThreshold) {
+            this.singleMessagesThreshold = singleMessagesThreshold;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) ((GridIoMessage) msg).message();
+
+                // We're interesting for only exchange messages and when node is stopped.
+                if (singleMsg.exchangeId() != null && singleMsg.exchangeId().isLeft() && !singleMsg.client()) {
+                    messages.add(singleMsg);
+
+                    if (messages.size() == singleMessagesThreshold)
+                        allSingleMessagesSent.onDone();
+                }
+            }
+
+            try {
+                if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsFullMessage) {
+                    if (blockFullMsgLatch != null)
+                        blockFullMsgLatch.await();
+                }
+            }
+            catch (Exception ignored) { }
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /** */
+        public void clear() {
+            messages.clear();
+            allSingleMessagesSent.reset();
+        }
+
+        /** */
+        public List<GridDhtPartitionsSingleMessage> getMessages() {
+            return Collections.unmodifiableList(messages);
+        }
+
+        /** */
+        public void blockFullMessage() {
+            blockFullMsgLatch = new CountDownLatch(1);
+        }
+
+        /** */
+        public void unblockFullMessage() {
+            blockFullMsgLatch.countDown();
+        }
+
+        /** */
+        public void waitUntilAllSingleMessagesAreSent() throws IgniteCheckedException {
+            allSingleMessagesSent.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
new file mode 100644
index 0000000..9ed8d54
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+/**
+ * Test correct behaviour of {@link GridDhtPartitionsStateValidator} class.
+ */
+public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstractTest {
+    /** Mocks and stubs. */
+    private final UUID localNodeId = UUID.randomUUID();
+    /** */
+    private GridCacheSharedContext cctxMock;
+    /** */
+    private GridDhtPartitionTopology topologyMock;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        // Prepare mocks.
+        cctxMock = Mockito.mock(GridCacheSharedContext.class);
+        Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+        topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+        Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+        Mockito.when(topologyMock.groupId()).thenReturn(0);
+        Mockito.when(topologyMock.partitions()).thenReturn(3);
+
+        List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
+                partitionMock(0, 1, 1),
+                partitionMock(1, 2, 2),
+                partitionMock(2, 3, 3)
+        );
+        Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+        Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+    }
+
+    /**
+     * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}.
+     */
+    private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) {
+        GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class);
+        Mockito.when(partitionMock.id()).thenReturn(id);
+        Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+        Mockito.when(partitionMock.fullSize()).thenReturn(size);
+        return partitionMock;
+    }
+
+    /**
+     * @return Message containing specified {@code countersMap}.
+     */
+    private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, T2<Long, Long>> countersMap) {
+        GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+        msg.addPartitionUpdateCounters(0, countersMap);
+        return msg;
+    }
+
+    /**
+     * @return Message containing specified {@code sizesMap}.
+     */
+    private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> sizesMap) {
+        GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+        msg.addPartitionSizes(0, sizesMap);
+        return msg;
+    }
+
+    /**
+     * Test partition update counters validation.
+     */
+    public void testPartitionCountersValidation() {
+        UUID remoteNode = UUID.randomUUID();
+        UUID ignoreNode = UUID.randomUUID();
+
+        // For partitions 0 and 2 (zero counter) we have inconsistent update counters.
+        Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+        updateCountersMap.put(0, new T2<>(2L, 2L));
+        updateCountersMap.put(1, new T2<>(2L, 2L));
+
+        // Form single messages map.
+        Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+        messages.put(remoteNode, fromUpdateCounters(updateCountersMap));
+        messages.put(ignoreNode, fromUpdateCounters(updateCountersMap));
+
+        GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+        // (partId, (nodeId, updateCounter))
+        Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsUpdateCounters(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+        // Check that validation result contains all necessary information.
+        Assert.assertEquals(2, result.size());
+        Assert.assertTrue(result.containsKey(0));
+        Assert.assertTrue(result.containsKey(2));
+        Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+        Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+        Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+        Assert.assertTrue(result.get(2).get(remoteNode) == 0L);
+    }
+
+    /**
+     * Test partition cache sizes validation.
+     */
+    public void testPartitionCacheSizesValidation() {
+        UUID remoteNode = UUID.randomUUID();
+        UUID ignoreNode = UUID.randomUUID();
+
+        // For partitions 0 and 2 we have inconsistent cache sizes.
+        Map<Integer, Long> cacheSizesMap = new HashMap<>();
+        cacheSizesMap.put(0, 2L);
+        cacheSizesMap.put(1, 2L);
+        cacheSizesMap.put(2, 2L);
+
+        // Form single messages map.
+        Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+        messages.put(remoteNode, fromCacheSizes(cacheSizesMap));
+        messages.put(ignoreNode, fromCacheSizes(cacheSizesMap));
+
+        GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+        // (partId, (nodeId, cacheSize))
+        Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsSizes(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+        // Check that validation result contains all necessary information.
+        Assert.assertEquals(2, result.size());
+        Assert.assertTrue(result.containsKey(0));
+        Assert.assertTrue(result.containsKey(2));
+        Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+        Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+        Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+        Assert.assertTrue(result.get(2).get(remoteNode) == 2L);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
new file mode 100644
index 0000000..03ea0f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T1;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class TxOptimisticOnPartitionExchangeTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 3;
+
+    /** Tx size. */
+    private static final int TX_SIZE = 20 * NODES_CNT;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Logger started. */
+    private static volatile boolean msgInterception;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi(log()));
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration()
+            .setName(CACHE_NAME)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setCacheMode(PARTITIONED)
+            .setBackups(1));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConsistencyOnPartitionExchange() throws Exception {
+        doTest(SERIALIZABLE, true);
+        doTest(READ_COMMITTED, true);
+        doTest(SERIALIZABLE, false);
+        doTest(READ_COMMITTED, false);
+    }
+
+    /**
+     * @param isolation {@link TransactionIsolation}.
+     * @param txInitiatorPrimary False If the transaction does not use the keys of the node that initiated it.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void doTest(final TransactionIsolation isolation, boolean txInitiatorPrimary) throws Exception {
+        final CountDownLatch txStarted = new CountDownLatch(1);
+
+        final IgniteCache cache = ignite(0).cache(CACHE_NAME);
+
+        final Map<Integer, Integer> txValues = new TreeMap<>();
+
+        ClusterNode node = ignite(0).cluster().node();
+
+        GridCacheAffinityManager affinity = ((IgniteCacheProxy)cache).context().affinity();
+
+        for (int i = 0; txValues.size() < TX_SIZE; i++) {
+            if (!txInitiatorPrimary && node.equals(affinity.primaryByKey(i, NONE)))
+                continue;
+
+            txValues.put(i, i);
+        }
+
+        TestCommunicationSpi.init();
+
+        msgInterception = true;
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() {
+                try (Transaction tx = ignite(0).transactions().txStart(OPTIMISTIC, isolation)) {
+                    info(">>> TX started.");
+
+                    txStarted.countDown();
+
+                    cache.putAll(txValues);
+
+                    tx.commit();
+
+                    info(">>> TX committed.");
+                }
+
+                return null;
+            }
+        });
+
+        txStarted.await();
+
+        try {
+            info(">>> Grid starting.");
+
+            IgniteEx ignite = startGrid(NODES_CNT);
+
+            info(">>> Grid started.");
+
+            fut.get();
+
+            awaitPartitionMapExchange();
+
+            msgInterception = false;
+
+            IgniteCache<Object, Object> cacheStartedNode = ignite.cache(CACHE_NAME);
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                Set<Object> keys = cacheStartedNode.getAll(txValues.keySet()).keySet();
+
+                assertEquals(txValues.keySet(), new TreeSet<>(keys));
+
+                tx.commit();
+            }
+        }
+        finally {
+            msgInterception = false;
+
+            stopGrid(NODES_CNT);
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Partition single message sent from added node. */
+        private static volatile CountDownLatch partSingleMsgSentFromAddedNode;
+
+        /** Partition supply message sent count. */
+        private static final AtomicInteger partSupplyMsgSentCnt = new AtomicInteger();
+
+        /** Logger. */
+        private IgniteLogger log;
+
+        /**
+         * @param log Logger.
+         */
+        public TestCommunicationSpi(IgniteLogger log) {
+            this.log = log;
+        }
+
+        /**
+         *
+         */
+        public static void init() {
+            partSingleMsgSentFromAddedNode = new CountDownLatch(1);
+
+            partSupplyMsgSentCnt.set(0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (msgInterception) {
+                if (msg instanceof GridIoMessage) {
+                    final Message msg0 = ((GridIoMessage)msg).message();
+
+                    String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+                    int nodeIdx = Integer.parseInt(locNodeId.substring(locNodeId.length() - 3));
+
+                    if (nodeIdx == 0) {
+                        if (msg0 instanceof GridNearTxPrepareRequest || msg0 instanceof GridDhtTxPrepareRequest) {
+                            GridTestUtils.runAsync(new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    partSingleMsgSentFromAddedNode.await();
+
+                                    sendMessage(node, msg, ackC, true);
+
+                                    return null;
+                                }
+                            });
+
+                            return;
+
+                        }
+                        else if (msg0 instanceof GridNearTxFinishRequest || msg0 instanceof GridDhtTxFinishRequest) {
+                            GridTestUtils.runAsync(new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    final T1<Integer> i = new T1<>(0);
+
+                                    while (waitForCondition(new GridAbsPredicate() {
+                                        @Override public boolean apply() {
+                                            return partSupplyMsgSentCnt.get() > i.get();
+                                        }
+                                    }, i.get() == 0 ? 5_000 : 500))
+                                        i.set(partSupplyMsgSentCnt.get());
+
+                                    sendMessage(node, msg, ackC, true);
+
+                                    return null;
+                                }
+                            });
+
+                            return;
+                        }
+                    }
+                    else if (nodeIdx == NODES_CNT && msg0 instanceof GridDhtPartitionsSingleMessage)
+                        partSingleMsgSentFromAddedNode.countDown();
+
+                    if (msg0 instanceof GridDhtPartitionSupplyMessage)
+                        partSupplyMsgSentCnt.incrementAndGet();
+                }
+            }
+
+            sendMessage(node, msg, ackC, msgInterception);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         * @param ackC Ack closure.
+         * @param logMsg Log Messages.
+         */
+        private void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC,
+            boolean logMsg
+        ) throws IgniteSpiException {
+            if (logMsg) {
+                String id = node.id().toString();
+                String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                log.info(
+                    String.format(">>> Output msg[type=%s, fromNode= %s, toNode=%s]",
+                        msg0.getClass().getSimpleName(),
+                        locNodeId.charAt(locNodeId.length() - 1),
+                        id.charAt(id.length() - 1)
+                    )
+                );
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb397f7..0612615 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheT
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
@@ -292,6 +294,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
         suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class);
         suite.addTestSuite(CacheDeferredDeleteQueueTest.class);
+        suite.addTestSuite(GridCachePartitionsStateValidatorSelfTest.class);
+        suite.addTestSuite(GridCachePartitionsStateValidationTest.class);
 
         suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f8add30..415479d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
 import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest;
 import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticT
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
@@ -93,6 +95,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
         suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class);
         suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
 
+        suite.addTestSuite(TxOptimisticOnPartitionExchangeTest.class);
+
+        suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class);
+
         return suite;
     }
 }


[07/10] ignite git commit: IGNITE-8111 Add extra validation for WAL segment size - Fixes #3768.

Posted by ak...@apache.org.
IGNITE-8111 Add extra validation for WAL segment size - Fixes #3768.

Signed-off-by: dpavlov <dp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97524668
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97524668
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97524668

Branch: refs/heads/ignite-8201
Commit: 975246687c9d143830501340e597a35d1a4c492a
Parents: a4653b7
Author: denis.garus <d....@isimplelab.com>
Authored: Wed Apr 11 13:01:22 2018 +0300
Committer: dpavlov <dp...@apache.org>
Committed: Wed Apr 11 13:01:22 2018 +0300

----------------------------------------------------------------------
 .../configuration/DataStorageConfiguration.java |  6 ++--
 .../DataStorageConfigurationValidationTest.java | 33 ++++++++++++++++++--
 .../db/wal/IgniteWalFlushFailoverTest.java      |  4 +--
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |  4 +--
 4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index a433760..747efd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -533,12 +533,14 @@ public class DataStorageConfiguration implements Serializable {
 
     /**
      * Sets size of a WAL segment.
+     * If value is not set (or zero), {@link #DFLT_WAL_SEGMENT_SIZE} will be used.
      *
-     * @param walSegmentSize WAL segment size. 64 MB is used by default.  Maximum value is 2Gb.
+     * @param walSegmentSize WAL segment size. Value must be between 512Kb and 2Gb.
      * @return {@code This} for chaining.
      */
     public DataStorageConfiguration setWalSegmentSize(int walSegmentSize) {
-        A.ensure(walSegmentSize >= 0, "WAL segment size must be non-negative and less than 2 Gb.");
+        if (walSegmentSize != 0)
+            A.ensure(walSegmentSize >= 512 * 1024, "WAL segment size must be between 512Kb and 2Gb.");
 
         this.walSegmentSize = walSegmentSize;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
index 7f667ee..9471a82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
@@ -31,11 +31,10 @@ public class DataStorageConfigurationValidationTest extends TestCase {
      *
      * @throws Exception If failed.
      */
-    public void testWalSegmentSizeOveflow() throws Exception {
+    public void testWalSegmentSizeOverflow() throws Exception {
         final DataStorageConfiguration cfg = new DataStorageConfiguration();
 
         GridTestUtils.assertThrows(null, new Callable<Void>() {
-            /** {@inheritDoc} */
             @Override public Void call() {
                 cfg.setWalSegmentSize(1 << 31);
 
@@ -43,4 +42,34 @@ public class DataStorageConfigurationValidationTest extends TestCase {
             }
         }, IllegalArgumentException.class, null);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetWalSegmentSizeShouldThrowExceptionWhenSizeLessThen512Kb() throws Exception {
+        final DataStorageConfiguration cfg = new DataStorageConfiguration();
+
+        GridTestUtils.assertThrows(null, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cfg.setWalSegmentSize(512 * 1024 - 1);
+
+                return null;
+            }
+        }, IllegalArgumentException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetWalSegmentSizeShouldBeOkWhenSizeBetween512KbAnd2Gb() throws Exception {
+        final DataStorageConfiguration cfg = new DataStorageConfiguration();
+
+        cfg.setWalSegmentSize(512 * 1024);
+
+        assertEquals(512 * 1024, cfg.getWalSegmentSize());
+
+        cfg.setWalSegmentSize(Integer.MAX_VALUE);
+
+        assertEquals(Integer.MAX_VALUE, cfg.getWalSegmentSize());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 042a447..351a42c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -92,8 +92,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
                 .setDefaultDataRegionConfiguration(
                  new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
                 .setWalMode(WALMode.BACKGROUND)
-                .setWalBufferSize(128 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
-                .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+                .setWalBufferSize(1024 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
+                .setWalSegmentSize(flushByTimeout ? 2 * 1024 * 1024 : 512 * 1024);
 
         cfg.setDataStorageConfiguration(memCfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index fe16328..cc0986a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -106,8 +106,8 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
                 .setDefaultDataRegionConfiguration(
                         new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
                 .setWalMode(this.walMode())
-                .setWalSegmentSize(50_000)
-                .setWalBufferSize(50_000);
+                .setWalSegmentSize(512 * 1024)
+                .setWalBufferSize(512 * 1024);
 
         cfg.setDataStorageConfiguration(memCfg);
 


[08/10] ignite git commit: IGNITE-4091 Web Console: Refactored using of internal Angular API.

Posted by ak...@apache.org.
IGNITE-4091 Web Console: Refactored using of internal Angular API.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74d25456
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74d25456
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74d25456

Branch: refs/heads/ignite-8201
Commit: 74d254564a44a95db9945652c9b579ed6b431ee9
Parents: 9752466
Author: Alexander Kalinin <ve...@yandex.ru>
Authored: Wed Apr 11 17:09:41 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 17:09:41 2018 +0700

----------------------------------------------------------------------
 modules/web-console/frontend/app/app.config.js  | 14 +++---
 .../components/modal-import-models/component.js |  4 +-
 .../app/components/page-profile/controller.js   |  4 +-
 .../frontend/app/modules/ace.module.js          | 47 ++++++++++----------
 .../services/AngularStrapSelect.decorator.js    |  5 ++-
 .../services/AngularStrapTooltip.decorator.js   |  8 ++--
 .../frontend/app/services/FormUtils.service.js  |  3 +-
 7 files changed, 45 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/app.config.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/app.config.js b/modules/web-console/frontend/app/app.config.js
index 9d8dc99..e2bc057 100644
--- a/modules/web-console/frontend/app/app.config.js
+++ b/modules/web-console/frontend/app/app.config.js
@@ -43,7 +43,7 @@ igniteConsoleCfg.config(['$animateProvider', ($animateProvider) => {
 
 // AngularStrap modal popup configuration.
 igniteConsoleCfg.config(['$modalProvider', ($modalProvider) => {
-    angular.extend($modalProvider.defaults, {
+    Object.assign($modalProvider.defaults, {
         animation: 'am-fade-and-scale',
         placement: 'center',
         html: true
@@ -52,7 +52,7 @@ igniteConsoleCfg.config(['$modalProvider', ($modalProvider) => {
 
 // AngularStrap popover configuration.
 igniteConsoleCfg.config(['$popoverProvider', ($popoverProvider) => {
-    angular.extend($popoverProvider.defaults, {
+    Object.assign($popoverProvider.defaults, {
         trigger: 'manual',
         placement: 'right',
         container: 'body',
@@ -62,7 +62,7 @@ igniteConsoleCfg.config(['$popoverProvider', ($popoverProvider) => {
 
 // AngularStrap tooltips configuration.
 igniteConsoleCfg.config(['$tooltipProvider', ($tooltipProvider) => {
-    angular.extend($tooltipProvider.defaults, {
+    Object.assign($tooltipProvider.defaults, {
         container: 'body',
         delay: {show: 150, hide: 150},
         placement: 'right',
@@ -73,7 +73,7 @@ igniteConsoleCfg.config(['$tooltipProvider', ($tooltipProvider) => {
 
 // AngularStrap select (combobox) configuration.
 igniteConsoleCfg.config(['$selectProvider', ($selectProvider) => {
-    angular.extend($selectProvider.defaults, {
+    Object.assign($selectProvider.defaults, {
         container: 'body',
         maxLength: '5',
         allText: 'Select All',
@@ -87,7 +87,7 @@ igniteConsoleCfg.config(['$selectProvider', ($selectProvider) => {
 
 // AngularStrap alerts configuration.
 igniteConsoleCfg.config(['$alertProvider', ($alertProvider) => {
-    angular.extend($alertProvider.defaults, {
+    Object.assign($alertProvider.defaults, {
         container: 'body',
         placement: 'top-right',
         duration: '5',
@@ -99,7 +99,7 @@ igniteConsoleCfg.config(['$alertProvider', ($alertProvider) => {
 
 // AngularStrap dropdowns () configuration.
 igniteConsoleCfg.config(['$dropdownProvider', ($dropdownProvider) => {
-    angular.extend($dropdownProvider.defaults, {
+    Object.assign($dropdownProvider.defaults, {
         templateUrl: dropdownTemplateUrl,
         animation: ''
     });
@@ -107,7 +107,7 @@ igniteConsoleCfg.config(['$dropdownProvider', ($dropdownProvider) => {
 
 // AngularStrap dropdowns () configuration.
 igniteConsoleCfg.config(['$datepickerProvider', ($datepickerProvider) => {
-    angular.extend($datepickerProvider.defaults, {
+    Object.assign($datepickerProvider.defaults, {
         autoclose: true,
         iconLeft: 'icon-datepicker-left',
         iconRight: 'icon-datepicker-right'

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js b/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
index 7f852b0..813c998 100644
--- a/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
+++ b/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
@@ -84,7 +84,7 @@ const DFLT_REPLICATED_CACHE = {
 const CACHE_TEMPLATES = [DFLT_PARTITIONED_CACHE, DFLT_REPLICATED_CACHE];
 
 export class ModalImportModels {
-    /** 
+    /**
      * Cluster ID to import models into
      * @type {string}
      */
@@ -771,7 +771,7 @@ export class ModalImportModels {
 
                 // Prepare caches for generation.
                 if (table.action === IMPORT_DM_NEW_CACHE) {
-                    const newCache = angular.copy(this.loadedCaches[table.cacheOrTemplate]);
+                    const newCache = _.cloneDeep(this.loadedCaches[table.cacheOrTemplate]);
 
                     batchAction.newCache = newCache;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/components/page-profile/controller.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/components/page-profile/controller.js b/modules/web-console/frontend/app/components/page-profile/controller.js
index 05fe118..c67a603 100644
--- a/modules/web-console/frontend/app/components/page-profile/controller.js
+++ b/modules/web-console/frontend/app/components/page-profile/controller.js
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+import _ from 'lodash';
+
 export default class PageProfileController {
     static $inject = [
         '$rootScope', '$scope', '$http', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteFocus', 'IgniteConfirm', 'IgniteCountries', 'User'
@@ -28,7 +30,7 @@ export default class PageProfileController {
         this.ui = {};
 
         this.User.read()
-            .then((user) => this.ui.user = angular.copy(user));
+            .then((user) => this.ui.user = _.cloneDeep(user));
 
         this.ui.countries = this.Countries.getAll();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/modules/ace.module.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/ace.module.js b/modules/web-console/frontend/app/modules/ace.module.js
index a28536a..6a6e70a 100644
--- a/modules/web-console/frontend/app/modules/ace.module.js
+++ b/modules/web-console/frontend/app/modules/ace.module.js
@@ -16,12 +16,13 @@
  */
 
 import angular from 'angular';
+import _ from 'lodash';
 
 angular
     .module('ignite-console.ace', [])
     .constant('igniteAceConfig', {})
     .directive('igniteAce', ['igniteAceConfig', (aceConfig) => {
-        if (angular.isUndefined(window.ace))
+        if (_.isUndefined(window.ace))
             throw new Error('ignite-ace need ace to work... (o rly?)');
 
         /**
@@ -43,7 +44,7 @@ angular
          */
         const setOptions = (acee, session, opts) => {
             // Sets the ace worker path, if running from concatenated or minified source.
-            if (angular.isDefined(opts.workerPath)) {
+            if (!_.isUndefined(opts.workerPath)) {
                 const config = window.ace.acequire('ace/config');
 
                 config.set('workerPath', opts.workerPath);
@@ -53,26 +54,26 @@ angular
             _.forEach(opts.require, (n) => window.ace.acequire(n));
 
             // Boolean options.
-            if (angular.isDefined(opts.showGutter))
+            if (!_.isUndefined(opts.showGutter))
                 acee.renderer.setShowGutter(opts.showGutter);
 
-            if (angular.isDefined(opts.useWrapMode))
+            if (!_.isUndefined(opts.useWrapMode))
                 session.setUseWrapMode(opts.useWrapMode);
 
-            if (angular.isDefined(opts.showInvisibles))
+            if (!_.isUndefined(opts.showInvisibles))
                 acee.renderer.setShowInvisibles(opts.showInvisibles);
 
-            if (angular.isDefined(opts.showIndentGuides))
+            if (!_.isUndefined(opts.showIndentGuides))
                 acee.renderer.setDisplayIndentGuides(opts.showIndentGuides);
 
-            if (angular.isDefined(opts.useSoftTabs))
+            if (!_.isUndefined(opts.useSoftTabs))
                 session.setUseSoftTabs(opts.useSoftTabs);
 
-            if (angular.isDefined(opts.showPrintMargin))
+            if (!_.isUndefined(opts.showPrintMargin))
                 acee.setShowPrintMargin(opts.showPrintMargin);
 
             // Commands.
-            if (angular.isDefined(opts.disableSearch) && opts.disableSearch) {
+            if (!_.isUndefined(opts.disableSearch) && opts.disableSearch) {
                 acee.commands.addCommands([{
                     name: 'unfind',
                     bindKey: {
@@ -85,21 +86,21 @@ angular
             }
 
             // Base options.
-            if (angular.isString(opts.theme))
+            if (_.isString(opts.theme))
                 acee.setTheme('ace/theme/' + opts.theme);
 
-            if (angular.isString(opts.mode))
+            if (_.isString(opts.mode))
                 session.setMode('ace/mode/' + opts.mode);
 
-            if (angular.isDefined(opts.firstLineNumber)) {
-                if (angular.isNumber(opts.firstLineNumber))
+            if (!_.isUndefined(opts.firstLineNumber)) {
+                if (_.isNumber(opts.firstLineNumber))
                     session.setOption('firstLineNumber', opts.firstLineNumber);
-                else if (angular.isFunction(opts.firstLineNumber))
+                else if (_.isFunction(opts.firstLineNumber))
                     session.setOption('firstLineNumber', opts.firstLineNumber());
             }
 
             // Advanced options.
-            if (angular.isDefined(opts.advanced)) {
+            if (!_.isUndefined(opts.advanced)) {
                 for (const key in opts.advanced) {
                     if (opts.advanced.hasOwnProperty(key)) {
                         // Create a javascript object with the key and value.
@@ -112,7 +113,7 @@ angular
             }
 
             // Advanced options for the renderer.
-            if (angular.isDefined(opts.rendererOptions)) {
+            if (!_.isUndefined(opts.rendererOptions)) {
                 for (const key in opts.rendererOptions) {
                     if (opts.rendererOptions.hasOwnProperty(key)) {
                         // Create a javascript object with the key and value.
@@ -126,7 +127,7 @@ angular
 
             // onLoad callbacks.
             _.forEach(opts.callbacks, (cb) => {
-                if (angular.isFunction(cb))
+                if (_.isFunction(cb))
                     cb(acee);
             });
         };
@@ -147,7 +148,7 @@ angular
                  *
                  * @type object
                  */
-                let opts = angular.extend({}, options, scope.$eval(attrs.igniteAce));
+                let opts = Object.assign({}, options, scope.$eval(attrs.igniteAce));
 
                 /**
                  * ACE editor.
@@ -191,9 +192,9 @@ angular
                             !scope.$$phase && !scope.$root.$$phase)
                             scope.$eval(() => ngModel.$setViewValue(newValue));
 
-                        if (angular.isDefined(callback)) {
+                        if (!_.isUndefined(callback)) {
                             scope.$evalAsync(() => {
-                                if (angular.isFunction(callback))
+                                if (_.isFunction(callback))
                                     callback([e, acee]);
                                 else
                                     throw new Error('ignite-ace use a function as callback');
@@ -210,10 +211,10 @@ angular
                     form && form.$removeControl(ngModel);
 
                     ngModel.$formatters.push((value) => {
-                        if (angular.isUndefined(value) || value === null)
+                        if (_.isUndefined(value) || value === null)
                             return '';
 
-                        if (angular.isObject(value) || angular.isArray(value))
+                        if (_.isObject(value) || _.isArray(value))
                             throw new Error('ignite-ace cannot use an object or an array as a model');
 
                         return value;
@@ -229,7 +230,7 @@ angular
                     if (current === previous)
                         return;
 
-                    opts = angular.extend({}, options, scope.$eval(attrs.igniteAce));
+                    opts = Object.assign({}, options, scope.$eval(attrs.igniteAce));
 
                     opts.callbacks = [opts.onLoad];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js b/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
index 39f7ccd..32fa167 100644
--- a/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
+++ b/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
@@ -16,6 +16,7 @@
  */
 
 import angular from 'angular';
+import _ from 'lodash';
 
 /**
  * Special decorator that fix problem in AngularStrap selectAll / deselectAll methods.
@@ -27,12 +28,12 @@ export default angular.module('mgcrea.ngStrap.select')
             const delegate = $delegate(element, controller, config);
 
             // Common vars.
-            const options = angular.extend({}, $delegate.defaults, config);
+            const options = Object.assign({}, $delegate.defaults, config);
 
             const scope = delegate.$scope;
 
             const valueByIndex = (index) => {
-                if (angular.isUndefined(scope.$matches[index]))
+                if (_.isUndefined(scope.$matches[index]))
                     return null;
 
                 return scope.$matches[index].value;

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js b/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
index d01a450..fa59f32 100644
--- a/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
+++ b/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
@@ -16,7 +16,7 @@
  */
 
 import angular from 'angular';
-import flow from 'lodash/flow';
+import _ from 'lodash';
 
 /**
  * Decorator that fix problem in AngularStrap $tooltip.
@@ -62,7 +62,7 @@ export default angular
 
                 scope.$emit(options.prefixEvent + '.hide.before', $tooltip);
 
-                if (angular.isDefined(options.onBeforeHide) && angular.isFunction(options.onBeforeHide))
+                if (!_.isUndefined(options.onBeforeHide) && _.isFunction(options.onBeforeHide))
                     options.onBeforeHide($tooltip);
 
                 $tooltip.$isShown = scope.$isShown = false;
@@ -82,8 +82,8 @@ export default angular
             const $tooltip = $delegate(el, config);
 
             $tooltip.$referenceElement = el;
-            $tooltip.destroy = flow($tooltip.destroy, () => $tooltip.$referenceElement = null);
-            $tooltip.$applyPlacement = flow($tooltip.$applyPlacement, () => {
+            $tooltip.destroy = _.flow($tooltip.destroy, () => $tooltip.$referenceElement = null);
+            $tooltip.$applyPlacement = _.flow($tooltip.$applyPlacement, () => {
                 if (!$tooltip.$element)
                     return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/FormUtils.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/FormUtils.service.js b/modules/web-console/frontend/app/services/FormUtils.service.js
index f22d4bc..da1d737 100644
--- a/modules/web-console/frontend/app/services/FormUtils.service.js
+++ b/modules/web-console/frontend/app/services/FormUtils.service.js
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import _ from 'lodash';
 
 export default ['IgniteFormUtils', ['$window', 'IgniteFocus', ($window, Focus) => {
     function ensureActivePanel(ui, pnl, focusId) {
@@ -41,7 +42,7 @@ export default ['IgniteFormUtils', ['$window', 'IgniteFocus', ($window, Focus) =
                 if (!activePanels || activePanels.length < 1)
                     ui.activePanels = [idx];
                 else if (!_.includes(activePanels, idx)) {
-                    const newActivePanels = angular.copy(activePanels);
+                    const newActivePanels = _.cloneDeep(activePanels);
 
                     newActivePanels.push(idx);
 


[05/10] ignite git commit: IGNITE-8216 Fixed javadoc for release build

Posted by ak...@apache.org.
IGNITE-8216 Fixed javadoc for release build


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6557fe62
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6557fe62
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6557fe62

Branch: refs/heads/ignite-8201
Commit: 6557fe62696ac24c740e445b53482da298b59b27
Parents: 780fc07
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Apr 11 12:28:40 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 11 12:28:40 2018 +0300

----------------------------------------------------------------------
 parent/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6557fe62/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 16a9395..3decc16 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -404,6 +404,10 @@
                                 <packages>org.apache.ignite.spi.eventstorage*</packages>
                             </group>
                             <group>
+                                <title>Communication Failure Detection</title>
+                                <packages>org.apache.ignite.failure</packages>
+                            </group>
+                            <group>
                                 <title>Segmentation Detection</title>
                                 <packages>org.apache.ignite.plugin.segmentation</packages>
                             </group>


[03/10] ignite git commit: IGNITE-7871 Implemented additional synchronization phase for correct partition counters update

Posted by ak...@apache.org.
IGNITE-7871 Implemented additional synchronization phase for correct partition counters update


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da77b981
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da77b981
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da77b981

Branch: refs/heads/ignite-8201
Commit: da77b9818a70495b7afdf6899ebd9180dadd7f68
Parents: f4de6df
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 11 11:23:46 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 11 11:23:46 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../discovery/GridDiscoveryManager.java         |  10 +
 .../MetaPageUpdatePartitionDataRecord.java      |   2 +-
 .../processors/cache/CacheMetricsImpl.java      |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |  38 +
 .../GridCachePartitionExchangeManager.java      |  17 +
 .../cache/GridCacheSharedContext.java           |   9 +-
 .../processors/cache/GridCacheUtils.java        |   2 +-
 .../cache/IgniteCacheOffheapManager.java        |   8 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  10 +-
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../distributed/dht/GridDhtLocalPartition.java  |   9 +-
 .../dht/GridDhtPartitionTopology.java           |   6 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  26 +-
 .../dht/GridDhtPartitionsStateValidator.java    | 255 +++++++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |  96 ++-
 .../GridDhtPartitionsSingleMessage.java         |  68 +-
 .../dht/preloader/InitNewCoordinatorFuture.java |   2 +-
 .../preloader/latch/ExchangeLatchManager.java   | 695 +++++++++++++++++++
 .../distributed/dht/preloader/latch/Latch.java  |  52 ++
 .../dht/preloader/latch/LatchAckMessage.java    | 165 +++++
 .../cache/distributed/near/GridNearTxLocal.java |  10 +
 .../persistence/GridCacheOffheapManager.java    |  10 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxManager.java     |  36 +-
 ...cheDhtLocalPartitionAfterRemoveSelfTest.java |   2 +-
 .../processors/cache/IgniteCacheGroupsTest.java |   1 +
 ...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
 .../GridCachePartitionsStateValidationTest.java | 316 +++++++++
 ...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
 .../TxOptimisticOnPartitionExchangeTest.java    | 322 +++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +
 .../testsuites/IgniteCacheTestSuite6.java       |   6 +
 35 files changed, 2568 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 1227e8c..0b2d41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -124,7 +124,10 @@ public enum GridTopic {
     TOPIC_METRICS,
 
     /** */
-    TOPIC_AUTH;
+    TOPIC_AUTH,
+
+    /** */
+    TOPIC_EXCHANGE;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5616fd0..581c32e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
 import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
 import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -921,6 +922,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 135:
+                msg = new LatchAckMessage();
+
+                break;
+
             // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a1d84e5..400bb5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -793,6 +793,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onDisconnected();
 
+                    if (!locJoin.isDone())
+                        locJoin.onDone(new IgniteCheckedException("Node disconnected"));
+
                     locJoin = new GridFutureAdapter<>();
 
                     registeredCaches.clear();
@@ -2142,6 +2145,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @return Local join future.
+     */
+    public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() {
+        return locJoin;
+    }
+
+    /**
      * @param msg Custom message.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index bafbf47..e5bd343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -32,7 +32,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
     /** */
     private long globalRmvId;
 
-    /** */
+    /** TODO: Partition size may be long */
     private int partSize;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6fae8fe..b402ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -792,7 +792,7 @@ public class CacheMetricsImpl implements CacheMetrics {
                     if (cctx.cache() == null)
                         continue;
 
-                    int cacheSize = part.dataStore().cacheSize(cctx.cacheId());
+                    long cacheSize = part.dataStore().cacheSize(cctx.cacheId());
 
                     offHeapEntriesCnt += cacheSize;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a9fa3c7..fade833 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -44,6 +44,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -314,6 +316,42 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Creates a future that will wait for finishing all remote transactions (primary -> backup)
+     * with topology version less or equal to {@code topVer}.
+     *
+     * @param topVer Topology version.
+     * @return Compound future of all {@link GridDhtTxFinishFuture} futures.
+     */
+    public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) {
+        GridCompoundFuture<?, ?> res = new CacheObjectsReleaseFuture<>("RemoteTx", topVer);
+
+        for (GridCacheFuture<?> fut : futs.values()) {
+            if (fut instanceof GridDhtTxFinishFuture) {
+                GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture) fut;
+
+                if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
+                    res.add(ignoreErrors(finishTxFuture));
+            }
+        }
+
+        res.markInitialized();
+
+        return res;
+    }
+
+    /**
+     * Future wrapper which ignores any underlying future errors.
+     *
+     * @param f Underlying future.
+     * @return Future wrapper which ignore any underlying future errors.
+     */
+    private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
+        GridFutureAdapter<?> wrapper = new GridFutureAdapter();
+        f.listen(future -> wrapper.onDone());
+        return wrapper;
+    }
+
+    /**
      * @param leftNodeId Left node ID.
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1a0e65f..20a3ccb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -216,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** For tests only. */
     private volatile AffinityTopologyVersion exchMergeTestWaitVer;
 
+    /** Distributed latch manager. */
+    private ExchangeLatchManager latchMgr;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -309,6 +313,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         exchWorker = new ExchangeWorker();
 
+        latchMgr = new ExchangeLatchManager(cctx.kernalContext());
+
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
@@ -1255,6 +1261,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     m.addPartitionUpdateCounters(grp.groupId(),
                         newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+                    m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes());
                 }
             }
         }
@@ -1277,6 +1285,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 m.addPartitionUpdateCounters(top.groupId(),
                     newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+                m.addPartitionSizes(top.groupId(), top.partitionSizes());
             }
         }
 
@@ -1570,6 +1580,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @return Latch manager instance.
+     */
+    public ExchangeLatchManager latch() {
+        return latchMgr;
+    }
+
+    /**
      * @param exchFut Optional current exchange future.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c2f9229..b3b4f0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.function.BiFunction;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -711,7 +710,7 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @return Ttl cleanup manager.
-     * */
+     */
     public GridCacheSharedTtlCleanupManager ttl() {
         return ttlMgr;
     }
@@ -854,10 +853,14 @@ public class GridCacheSharedContext<K, V> {
         GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer);
 
         f.add(mvcc().finishExplicitLocks(topVer));
-        f.add(tm().finishTxs(topVer));
         f.add(mvcc().finishAtomicUpdates(topVer));
         f.add(mvcc().finishDataStreamerUpdates(topVer));
 
+        IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer);
+        // To properly track progress of finishing local tx updates we explicitly add this future to compound set.
+        f.add(finishLocalTxsFuture);
+        f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
+
         f.markInitialized();
 
         return f;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a5169d2..d672420 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1732,7 +1732,7 @@ public class GridCacheUtils {
                             ver,
                             expiryPlc == null ? 0 : expiryPlc.forCreate(),
                             expiryPlc == null ? 0 : toExpireTime(expiryPlc.forCreate()),
-                            false,
+                            true,
                             topVer,
                             GridDrType.DR_BACKUP,
                             true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 3d83f87..a12c033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -22,11 +22,11 @@ import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -344,7 +344,7 @@ public interface IgniteCacheOffheapManager {
      * @param part Partition.
      * @return Number of entries.
      */
-    public int totalPartitionEntriesCount(int part);
+    public long totalPartitionEntriesCount(int part);
 
     /**
      *
@@ -381,7 +381,7 @@ public interface IgniteCacheOffheapManager {
          * @param cacheId Cache ID.
          * @return Size.
          */
-        int cacheSize(int cacheId);
+        long cacheSize(int cacheId);
 
         /**
          * @return Cache sizes if store belongs to group containing multiple caches.
@@ -391,7 +391,7 @@ public interface IgniteCacheOffheapManager {
         /**
          * @return Total size.
          */
-        int fullSize();
+        long fullSize();
 
         /**
          * @return Update counter.

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index b201935..f8cc86f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -252,7 +252,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public int totalPartitionEntriesCount(int p) {
+    @Override public long totalPartitionEntriesCount(int p) {
         if (grp.isLocal())
             return locCacheDataStore.fullSize();
         else {
@@ -1152,14 +1152,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public int cacheSize(int cacheId) {
+        @Override public long cacheSize(int cacheId) {
             if (grp.sharedGroup()) {
                 AtomicLong size = cacheSizes.get(cacheId);
 
                 return size != null ? (int)size.get() : 0;
             }
 
-            return (int)storageSize.get();
+            return storageSize.get();
         }
 
         /** {@inheritDoc} */
@@ -1176,8 +1176,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public int fullSize() {
-            return (int)storageSize.get();
+        @Override public long fullSize() {
+            return storageSize.get();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5bbbb31..3e3bb0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1196,6 +1196,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> partitionSizes() {
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
         assert false : "Should not be called on non-affinity node";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 7a47f31..ea20dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -929,7 +929,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     /**
      * @return Initial update counter.
      */
-    public Long initialUpdateCounter() {
+    public long initialUpdateCounter() {
         return store.initialUpdateCounter();
     }
 
@@ -948,6 +948,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @return Total size of all caches.
+     */
+    public long fullSize() {
+        return store.fullSize();
+    }
+
+    /**
      * Removes all entries and rows from this partition.
      *
      * @return Number of rows cleared from page memory.

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7f900cb..6f68dbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -345,6 +346,11 @@ public interface GridDhtPartitionTopology {
     public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
 
     /**
+     * @return Partition cache sizes.
+     */
+    public Map<Integer, Long> partitionSizes();
+
+    /**
      * @param part Partition to own.
      * @return {@code True} if owned.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 538c57e..740903e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -2526,6 +2528,28 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> partitionSizes() {
+        lock.readLock().lock();
+
+        try {
+            Map<Integer, Long> partitionSizes = new HashMap<>();
+
+            for (int p = 0; p < locParts.length(); p++) {
+                GridDhtLocalPartition part = locParts.get(p);
+                if (part == null || part.fullSize() == 0)
+                    continue;
+
+                partitionSizes.put(part.id(), part.fullSize());
+            }
+
+            return partitionSizes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
         AffinityTopologyVersion curTopVer = this.readyTopVer;
 
@@ -2587,7 +2611,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                int size = part.dataStore().fullSize();
+                long size = part.dataStore().fullSize();
 
                 if (size >= threshold)
                     X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
new file mode 100644
index 0000000..92a0584
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ * Class to validate partitions update counters and cache sizes during exchange process.
+ */
+public class GridDhtPartitionsStateValidator {
+    /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */
+    private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache shared context.
+     */
+    public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) {
+        this.cctx = cctx;
+    }
+
+    /**
+     * Validates partition states - update counters and cache sizes for all nodes.
+     * If update counter value or cache size for the same partitions are different on some nodes
+     * method throws exception with full information about inconsistent partitions.
+     *
+     * @param fut Current exchange future.
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @throws IgniteCheckedException If validation failed. Exception message contains
+     * full information about all partitions which update counters or cache sizes are not consistent.
+     */
+    public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut,
+                                                  GridDhtPartitionTopology top,
+                                                  Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException {
+        // Ignore just joined nodes.
+        final Set<UUID> ignoringNodes = new HashSet<>();
+
+        for (DiscoveryEvent evt : fut.events().events())
+            if (evt.type() == EVT_NODE_JOINED)
+                ignoringNodes.add(evt.eventNode().id());
+
+        AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+
+        // Validate update counters.
+        Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+        if (!result.isEmpty())
+            throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
+
+        // For sizes validation ignore also nodes which are not able to send cache sizes.
+        for (UUID id : messages.keySet()) {
+            ClusterNode node = cctx.discovery().node(id);
+            if (node != null && node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0)
+                ignoringNodes.add(id);
+        }
+
+        // Validate cache sizes.
+        result = validatePartitionsSizes(top, messages, ignoringNodes);
+        if (!result.isEmpty())
+            throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
+    }
+
+    /**
+     * Validate partitions update counters for given {@code top}.
+     *
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @param ignoringNodes Nodes for what we ignore validation.
+     * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
+     * If map is empty validation is successful.
+     */
+     Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+            GridDhtPartitionTopology top,
+            Map<UUID, GridDhtPartitionsSingleMessage> messages,
+            Set<UUID> ignoringNodes) {
+        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+        Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
+
+        // Populate counters statistics from local node partitions.
+        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+            if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+                continue;
+
+            updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter()));
+        }
+
+        int partitions = top.partitions();
+
+        // Then process and validate counters from other nodes.
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+            UUID nodeId = e.getKey();
+            if (ignoringNodes.contains(nodeId))
+                continue;
+
+            CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+
+            for (int part = 0; part < partitions; part++) {
+                if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+                    continue;
+
+                int partIdx = countersMap.partitionIndex(part);
+                long currentCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0;
+
+                process(invalidPartitions, updateCountersAndNodesByPartitions, part, nodeId, currentCounter);
+            }
+        }
+
+        return invalidPartitions;
+    }
+
+    /**
+     * Validate partitions cache sizes for given {@code top}.
+     *
+     * @param top Topology to validate.
+     * @param messages Single messages received from all nodes.
+     * @param ignoringNodes Nodes for what we ignore validation.
+     * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
+     * If map is empty validation is successful.
+     */
+     Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+            GridDhtPartitionTopology top,
+            Map<UUID, GridDhtPartitionsSingleMessage> messages,
+            Set<UUID> ignoringNodes) {
+        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+        Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
+
+        // Populate sizes statistics from local node partitions.
+        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+            if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+                continue;
+
+            sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize()));
+        }
+
+        int partitions = top.partitions();
+
+        // Then process and validate sizes from other nodes.
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+            UUID nodeId = e.getKey();
+            if (ignoringNodes.contains(nodeId))
+                continue;
+
+            Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId());
+
+            for (int part = 0; part < partitions; part++) {
+                if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+                    continue;
+
+                long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L;
+
+                process(invalidPartitions, sizesAndNodesByPartitions, part, nodeId, currentSize);
+            }
+        }
+
+        return invalidPartitions;
+    }
+
+    /**
+     * Processes given {@code counter} for partition {@code part} reported by {@code node}.
+     * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different.
+     *
+     * @param invalidPartitions Invalid partitions map.
+     * @param countersAndNodes Current map of counters and nodes by partitions.
+     * @param part Processing partition.
+     * @param node Node id.
+     * @param counter Counter value reported by {@code node}.
+     */
+    private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
+                         Map<Integer, T2<UUID, Long>> countersAndNodes,
+                         int part,
+                         UUID node,
+                         long counter) {
+        T2<UUID, Long> existingData = countersAndNodes.get(part);
+
+        if (existingData == null)
+            countersAndNodes.put(part, new T2<>(node, counter));
+
+        if (existingData != null && counter != existingData.get2()) {
+            if (!invalidPartitions.containsKey(part)) {
+                Map<UUID, Long> map = new HashMap<>();
+                map.put(existingData.get1(), existingData.get2());
+                invalidPartitions.put(part, map);
+            }
+
+            invalidPartitions.get(part).put(node, counter);
+        }
+    }
+
+    /**
+     * Folds given map of invalid partition states to string representation in the following format:
+     * Part [id]: [consistentId=value*]
+     *
+     * Value can be both update counter or cache size.
+     *
+     * @param topVer Last topology version.
+     * @param invalidPartitions Invalid partitions map.
+     * @return String representation of invalid partitions.
+     */
+    private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, Long>> invalidPartitions) {
+        SB sb = new SB();
+
+        NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new TreeMap<>(invalidPartitions);
+
+        for (Map.Entry<Integer, Map<UUID, Long>> p : sortedPartitions.entrySet()) {
+            sb.a("Part ").a(p.getKey()).a(": [");
+            for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) {
+                Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId();
+                sb.a(consistentId).a("=").a(e.getValue()).a(" ");
+            }
+            sb.a("] ");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 28cc018..0609f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -447,6 +447,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
             err = e;
         }
+        catch (Throwable t) {
+            fut.onDone(t);
+
+            throw t;
+        }
 
         if (primarySync)
             sendFinishReply(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbb4985..dd4a571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -75,10 +76,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -290,6 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private GridDhtPartitionsExchangeFuture mergedWith;
 
+    /** Validator for partition states. */
+    @GridToStringExclude
+    private final GridDhtPartitionsStateValidator validator;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -314,6 +321,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         this.exchId = exchId;
         this.exchActions = exchActions;
         this.affChangeMsg = affChangeMsg;
+        this.validator = new GridDhtPartitionsStateValidator(cctx);
 
         log = cctx.logger(getClass());
         exchLog = cctx.logger(EXCHANGE_LOG);
@@ -1099,7 +1107,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
         partHistReserved = cctx.database().reserveHistoryForExchange();
 
-        waitPartitionRelease();
+        // On first phase we wait for finishing all local tx updates, atomic updates and lock releases.
+        waitPartitionRelease(1);
+
+        // Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
+        waitPartitionRelease(2);
 
         boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
@@ -1202,9 +1214,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * For the exact list of the objects being awaited for see
      * {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
      *
+     * @param phase Phase of partition release.
+     *
      * @throws IgniteCheckedException If failed.
      */
-    private void waitPartitionRelease() throws IgniteCheckedException {
+    private void waitPartitionRelease(int phase) throws IgniteCheckedException {
+        Latch releaseLatch = null;
+
+        // Wait for other nodes only on first phase.
+        if (phase == 1)
+            releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion());
+
         IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
 
         // Assign to class variable so it will be included into toString() method.
@@ -1238,6 +1258,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
                 }
             }
+            catch (IgniteCheckedException e) {
+                U.warn(log,"Unable to await partitions release future", e);
+
+                throw e;
+            }
         }
 
         long waitEnd = U.currentTimeMillis();
@@ -1290,6 +1315,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
         }
+
+        if (releaseLatch == null)
+            return;
+
+        releaseLatch.countDown();
+
+        if (!localJoinExchange()) {
+            try {
+                while (true) {
+                    try {
+                        releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS);
+
+                        if (log.isInfoEnabled())
+                            log.info("Finished waiting for partitions release latch: " + releaseLatch);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        U.warn(log, "Unable to await partitions release latch within timeout: " + releaseLatch);
+
+                        // Try to resend ack.
+                        releaseLatch.countDown();
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage());
+            }
+        }
     }
 
     /**
@@ -2499,6 +2553,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            validatePartitionsState();
+
             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
@@ -2683,6 +2739,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Validates that partition update counters and cache sizes for all caches are consistent.
+     */
+    private void validatePartitionsState() {
+        for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) {
+            CacheGroupDescriptor grpDesc = e.getValue();
+            if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
+                continue;
+
+            int grpId = e.getKey();
+
+            CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpId);
+
+            GridDhtPartitionTopology top = grpCtx != null ?
+                    grpCtx.topology() :
+                    cctx.exchange().clientTopology(grpId, events().discoveryCache());
+
+            // Do not validate read or write through caches or caches with disabled rebalance.
+            if (grpCtx == null
+                    || grpCtx.config().isReadThrough()
+                    || grpCtx.config().isWriteThrough()
+                    || grpCtx.config().getCacheStoreFactory() != null
+                    || grpCtx.config().getRebalanceDelay() != -1
+                    || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE)
+                continue;
+
+            try {
+                validator.validatePartitionCountersAndSizes(this, top, msgs);
+            }
+            catch (IgniteCheckedException ex) {
+                log.warning("Partition states validation was failed for cache " + grpDesc.cacheOrGroupName(), ex);
+                // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
+            }
+        }
+    }
+
+    /**
      *
      */
     private void assignPartitionsStates() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 215152d..6ebafac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.util.Collection;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -67,6 +67,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
 
+    /** Partitions sizes. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partSizes;
+
+    /** Serialized partitions counters. */
+    private byte[] partSizesBytes;
+
     /** Partitions history reservation counters. */
     @GridToStringInclude
     @GridDirectTransient
@@ -220,6 +228,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * Adds partition sizes map for specified {@code grpId} to the current message.
+     *
+     * @param grpId Group id.
+     * @param partSizesMap Partition sizes map.
+     */
+    public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
+        if (partSizesMap.isEmpty())
+            return;
+
+        if (partSizes == null)
+            partSizes = new HashMap<>();
+
+        partSizes.put(grpId, partSizesMap);
+    }
+
+    /**
+     * Returns partition sizes map for specified {@code grpId}.
+     *
+     * @param grpId Group id.
+     * @return Partition sizes map (partId, partSize).
+     */
+    public Map<Integer, Long> partitionSizes(int grpId) {
+        if (partSizes == null)
+            return Collections.emptyMap();
+
+        return partSizes.getOrDefault(grpId, Collections.emptyMap());
+    }
+
+    /**
      * @param grpId Cache group ID.
      * @param cntrMap Partition history counters.
      */
@@ -287,12 +324,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         boolean marshal = (parts != null && partsBytes == null) ||
             (partCntrs != null && partCntrsBytes == null) ||
             (partHistCntrs != null && partHistCntrsBytes == null) ||
+            (partSizes != null && partSizesBytes == null) ||
             (err != null && errBytes == null);
 
         if (marshal) {
             byte[] partsBytes0 = null;
             byte[] partCntrsBytes0 = null;
             byte[] partHistCntrsBytes0 = null;
+            byte[] partSizesBytes0 = null;
             byte[] errBytes0 = null;
 
             if (parts != null && partsBytes == null)
@@ -304,6 +343,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             if (partHistCntrs != null && partHistCntrsBytes == null)
                 partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
 
+            if (partSizes != null && partSizesBytes == null)
+                partSizesBytes0 = U.marshal(ctx, partSizes);
+
             if (err != null && errBytes == null)
                 errBytes0 = U.marshal(ctx, err);
 
@@ -314,11 +356,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
                     byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
+                    byte[] partSizesBytesZip = U.zip(partSizesBytes0);
                     byte[] exBytesZip = U.zip(errBytes0);
 
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
                     partHistCntrsBytes0 = partHistCntrsBytesZip;
+                    partSizesBytes0 = partSizesBytesZip;
                     errBytes0 = exBytesZip;
 
                     compressed(true);
@@ -331,6 +375,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             partsBytes = partsBytes0;
             partCntrsBytes = partCntrsBytes0;
             partHistCntrsBytes = partHistCntrsBytes0;
+            partSizesBytes = partSizesBytes0;
             errBytes = errBytes0;
         }
     }
@@ -360,6 +405,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partSizesBytes != null && partSizes == null) {
+            if (compressed())
+                partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partSizes = U.unmarshal(ctx, partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
+
         if (errBytes != null && err == null) {
             if (compressed())
                 err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -451,6 +503,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 writer.incrementState();
 
+            case 13:
+                if (!writer.writeByteArray("partsSizesBytes", partSizesBytes))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -531,6 +588,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
+            case 13:
+                partSizesBytes = reader.readByteArray("partsSizesBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -543,7 +607,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 596fa8c..42a9ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -235,7 +235,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
             if (awaited.remove(node.id())) {
                 GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
 
-                if (fullMsg0 != null) {
+                if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != null) {
                     assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
 
                     fullMsg  = fullMsg0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
new file mode 100644
index 0000000..c205cb1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Class is responsible to create and manage instances of distributed latches {@link Latch}.
+ */
+public class ExchangeLatchManager {
+    /** Version since latch management is available. */
+    private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** Discovery manager. */
+    private final GridDiscoveryManager discovery;
+
+    /** IO manager. */
+    private final GridIoManager io;
+
+    /** Current coordinator. */
+    private volatile ClusterNode coordinator;
+
+    /** Pending acks collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+
+    /** Server latches collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>();
+
+    /** Client latches collection. */
+    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>();
+
+    /** Lock. */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public ExchangeLatchManager(GridKernalContext ctx) {
+        this.ctx = ctx;
+        this.log = ctx.log(getClass());
+        this.discovery = ctx.discovery();
+        this.io = ctx.io();
+
+        if (!ctx.clientNode()) {
+            ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
+                if (msg instanceof LatchAckMessage) {
+                    processAck(nodeId, (LatchAckMessage) msg);
+                }
+            });
+
+            // First coordinator initialization.
+            ctx.discovery().localJoinFuture().listen(f -> {
+                this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+            });
+
+            ctx.event().addDiscoveryEventListener((e, cache) -> {
+                assert e != null;
+                assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
+
+                // Do not process from discovery thread.
+                ctx.closure().runLocalSafe(() -> processNodeLeft(e.eventNode()));
+            }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        }
+    }
+
+    /**
+     * Creates server latch with given {@code id} and {@code topVer}.
+     * Adds corresponding pending acks to it.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @param participants Participant nodes.
+     * @return Server latch instance.
+     */
+    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+        if (serverLatches.containsKey(latchId))
+            return serverLatches.get(latchId);
+
+        ServerLatch latch = new ServerLatch(id, topVer, participants);
+
+        serverLatches.put(latchId, latch);
+
+        if (log.isDebugEnabled())
+            log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]");
+
+        if (pendingAcks.containsKey(latchId)) {
+            Set<UUID> acks = pendingAcks.get(latchId);
+
+            for (UUID node : acks)
+                if (latch.hasParticipant(node) && !latch.hasAck(node))
+                    latch.ack(node);
+
+            pendingAcks.remove(latchId);
+        }
+
+        if (latch.isCompleted())
+            serverLatches.remove(latchId);
+
+        return latch;
+    }
+
+    /**
+     * Creates client latch.
+     * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @param coordinator Coordinator node.
+     * @param participants Participant nodes.
+     * @return Client latch instance.
+     */
+    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+        if (clientLatches.containsKey(latchId))
+            return clientLatches.get(latchId);
+
+        ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants);
+
+        if (log.isDebugEnabled())
+            log.debug("Client latch is created [latch=" + latchId
+                    + ", crd=" + coordinator
+                    + ", participantsSize=" + participants.size() + "]");
+
+        // There is final ack for created latch.
+        if (pendingAcks.containsKey(latchId)) {
+            latch.complete();
+            pendingAcks.remove(latchId);
+        }
+        else
+            clientLatches.put(latchId, latch);
+
+        return latch;
+    }
+
+    /**
+     * Creates new latch with specified {@code id} and {@code topVer} or returns existing latch.
+     *
+     * Participants of latch are calculated from given {@code topVer} as alive server nodes.
+     * If local node is coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance.
+     *
+     * @param id Latch id.
+     * @param topVer Latch topology version.
+     * @return Latch instance.
+     */
+    public Latch getOrCreate(String id, AffinityTopologyVersion topVer) {
+        lock.lock();
+
+        try {
+            ClusterNode coordinator = getLatchCoordinator(topVer);
+
+            if (coordinator == null) {
+                ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList());
+                latch.complete();
+
+                return latch;
+            }
+
+            Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+            return coordinator.isLocal()
+                ? createServerLatch(id, topVer, participants)
+                : createClientLatch(id, topVer, coordinator, participants);
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @param topVer Latch topology version.
+     * @return Collection of alive server nodes with latch functionality.
+     */
+    private Collection<ClusterNode> getLatchParticipants(AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+                ? discovery.aliveServerNodes()
+                : discovery.discoCache(topVer).aliveServerNodes();
+
+        return aliveNodes
+                .stream()
+                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * @param topVer Latch topology version.
+     * @return Oldest alive server node with latch functionality.
+     */
+    @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+                ? discovery.aliveServerNodes()
+                : discovery.discoCache(topVer).aliveServerNodes();
+
+        return aliveNodes
+                .stream()
+                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+                .findFirst()
+                .orElse(null);
+    }
+
+    /**
+     * Processes ack message from given {@code from} node.
+     *
+     * Completes client latch in case of final ack message.
+     *
+     * If no latch is associated with message, ack is placed to {@link #pendingAcks} set.
+     *
+     * @param from Node sent ack.
+     * @param message Ack message.
+     */
+    private void processAck(UUID from, LatchAckMessage message) {
+        lock.lock();
+
+        try {
+            ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+            if (coordinator == null)
+                return;
+
+            T2<String, AffinityTopologyVersion> latchId = new T2<>(message.latchId(), message.topVer());
+
+            if (message.isFinal()) {
+                if (log.isDebugEnabled())
+                    log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]");
+
+                if (clientLatches.containsKey(latchId)) {
+                    ClientLatch latch = clientLatches.remove(latchId);
+                    latch.complete();
+                }
+                else if (!coordinator.isLocal()) {
+                    pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+                    pendingAcks.get(latchId).add(from);
+                }
+            } else {
+                if (log.isDebugEnabled())
+                    log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
+
+                if (serverLatches.containsKey(latchId)) {
+                    ServerLatch latch = serverLatches.get(latchId);
+
+                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+                        latch.ack(from);
+
+                        if (latch.isCompleted())
+                            serverLatches.remove(latchId);
+                    }
+                }
+                else {
+                    pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+                    pendingAcks.get(latchId).add(from);
+                }
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Changes coordinator to current local node.
+     * Restores all server latches from pending acks and own client latches.
+     */
+    private void becomeNewCoordinator() {
+        if (log.isInfoEnabled())
+            log.info("Become new coordinator " + coordinator.id());
+
+        List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>();
+        latchesToRestore.addAll(pendingAcks.keySet());
+        latchesToRestore.addAll(clientLatches.keySet());
+
+        for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
+            String id = latchId.get1();
+            AffinityTopologyVersion topVer = latchId.get2();
+            Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+            if (!participants.isEmpty())
+                createServerLatch(id, topVer, participants);
+        }
+    }
+
+    /**
+     * Handles node left discovery event.
+     *
+     * Summary:
+     * Removes pending acks corresponds to the left node.
+     * Adds fake acknowledgements to server latches where such node was participant.
+     * Changes client latches coordinator to oldest available server node where such node was coordinator.
+     * Detects coordinator change.
+     *
+     * @param left Left node.
+     */
+    private void processNodeLeft(ClusterNode left) {
+        assert this.coordinator != null : "Coordinator is not initialized";
+
+        lock.lock();
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Process node left " + left.id());
+
+            ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+            if (coordinator == null)
+                return;
+
+            // Clear pending acks.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> ackEntry : pendingAcks.entrySet())
+                if (ackEntry.getValue().contains(left.id()))
+                    pendingAcks.get(ackEntry.getKey()).remove(left.id());
+
+            // Change coordinator for client latches.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> latchEntry : clientLatches.entrySet()) {
+                ClientLatch latch = latchEntry.getValue();
+                if (latch.hasCoordinator(left.id())) {
+                    // Change coordinator for latch and re-send ack if necessary.
+                    if (latch.hasParticipant(coordinator.id()))
+                        latch.newCoordinator(coordinator);
+                    else {
+                        /* If new coordinator is not able to take control on the latch,
+                           it means that all other latch participants are left from topology
+                           and there is no reason to track such latch. */
+                        AffinityTopologyVersion topVer = latchEntry.getKey().get2();
+
+                        assert getLatchParticipants(topVer).isEmpty();
+
+                        latch.complete(new IgniteCheckedException("All latch participants are left from topology."));
+                        clientLatches.remove(latchEntry.getKey());
+                    }
+                }
+            }
+
+            // Add acknowledgements from left node.
+            for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> latchEntry : serverLatches.entrySet()) {
+                ServerLatch latch = latchEntry.getValue();
+
+                if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]");
+
+                    latch.ack(left.id());
+
+                    if (latch.isCompleted())
+                        serverLatches.remove(latchEntry.getKey());
+                }
+            }
+
+            // Coordinator is changed.
+            if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) {
+                this.coordinator = coordinator;
+
+                becomeNewCoordinator();
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Latch creating on coordinator node.
+     * Latch collects acks from participants: non-coordinator nodes and current local node.
+     * Latch completes when all acks from all participants are received.
+     *
+     * After latch completion final ack is sent to all participants.
+     */
+    class ServerLatch extends CompletableLatch {
+        /** Number of latch permits. This is needed to track number of countDown invocations. */
+        private final AtomicInteger permits;
+
+        /** Set of received acks. */
+        private final Set<UUID> acks = new GridConcurrentHashSet<>();
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param participants Participant nodes.
+         */
+        ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+            super(id, topVer, participants);
+            this.permits = new AtomicInteger(participants.size());
+
+            // Send final acks when latch is completed.
+            this.complete.listen(f -> {
+                for (ClusterNode node : participants) {
+                    try {
+                        if (discovery.alive(node)) {
+                            io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, true), GridIoPolicy.SYSTEM_POOL);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Final ack is ackSent [latch=" + latchId() + ", to=" + node.id() + "]");
+                        }
+                    } catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to send final ack [latch=" + latchId() + ", to=" + node.id() + "]");
+                    }
+                }
+            });
+        }
+
+        /**
+         * Checks if latch has ack from given node.
+         *
+         * @param from Node.
+         * @return {@code true} if latch has ack from given node.
+         */
+        private boolean hasAck(UUID from) {
+            return acks.contains(from);
+        }
+
+        /**
+         * Receives ack from given node.
+         * Count downs latch if ack was not already processed.
+         *
+         * @param from Node.
+         */
+        private void ack(UUID from) {
+            if (log.isDebugEnabled())
+                log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + "]");
+
+            countDown0(from);
+        }
+
+        /**
+         * Count down latch from ack of given node.
+         * Completes latch if all acks are received.
+         *
+         * @param node Node.
+         */
+        private void countDown0(UUID node) {
+            if (isCompleted() || acks.contains(node))
+                return;
+
+            acks.add(node);
+
+            int remaining = permits.decrementAndGet();
+
+            if (log.isDebugEnabled())
+                log.debug("Count down + [latch=" + latchId() + ", remaining=" + remaining + "]");
+
+            if (remaining == 0)
+                complete();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void countDown() {
+            countDown0(ctx.localNodeId());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            Set<UUID> pendingAcks = participants.stream().filter(ack -> !acks.contains(ack)).collect(Collectors.toSet());
+
+            return S.toString(ServerLatch.class, this,
+                    "pendingAcks", pendingAcks,
+                    "super", super.toString());
+        }
+    }
+
+    /**
+     * Latch creating on non-coordinator node.
+     * Latch completes when final ack from coordinator is received.
+     */
+    class ClientLatch extends CompletableLatch {
+        /** Latch coordinator node. Can be changed if coordinator is left from topology. */
+        private volatile ClusterNode coordinator;
+
+        /** Flag indicates that ack is sent to coordinator. */
+        private boolean ackSent;
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param coordinator Coordinator node.
+         * @param participants Participant nodes.
+         */
+        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+            super(id, topVer, participants);
+
+            this.coordinator = coordinator;
+        }
+
+        /**
+         * Checks if latch coordinator is given {@code node}.
+         *
+         * @param node Node.
+         * @return {@code true} if latch coordinator is given node.
+         */
+        private boolean hasCoordinator(UUID node) {
+            return coordinator.id().equals(node);
+        }
+
+        /**
+         * Changes coordinator of latch and resends ack to new coordinator if needed.
+         *
+         * @param coordinator New coordinator.
+         */
+        private void newCoordinator(ClusterNode coordinator) {
+            if (log.isDebugEnabled())
+                log.debug("Coordinator is changed [latch=" + latchId() + ", crd=" + coordinator.id() + "]");
+
+            synchronized (this) {
+                this.coordinator = coordinator;
+
+                // Resend ack to new coordinator.
+                if (ackSent)
+                    sendAck();
+            }
+        }
+
+        /**
+         * Sends ack to coordinator node.
+         * There is ack deduplication on coordinator. So it's fine to send same ack twice.
+         */
+        private void sendAck() {
+            try {
+                ackSent = true;
+
+                io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, false), GridIoPolicy.SYSTEM_POOL);
+
+                if (log.isDebugEnabled())
+                    log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" + coordinator.id() + "]");
+            } catch (IgniteCheckedException e) {
+                // Coordinator is unreachable. On coodinator node left discovery event ack will be resent.
+                if (log.isDebugEnabled())
+                    log.debug("Unable to send ack [latch=" + latchId() + ", to=" + coordinator.id() + "]: " + e.getMessage());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void countDown() {
+            if (isCompleted())
+                return;
+
+            // Synchronize in case of changed coordinator.
+            synchronized (this) {
+                sendAck();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ClientLatch.class, this,
+                    "super", super.toString());
+        }
+    }
+
+    /**
+     * Base latch functionality with implemented complete / await logic.
+     */
+    private abstract static class CompletableLatch implements Latch {
+        /** Latch id. */
+        @GridToStringInclude
+        protected final String id;
+
+        /** Latch topology version. */
+        @GridToStringInclude
+        protected final AffinityTopologyVersion topVer;
+
+        /** Latch node participants. Only participant nodes are able to change state of latch. */
+        @GridToStringExclude
+        protected final Set<UUID> participants;
+
+        /** Future indicates that latch is completed. */
+        @GridToStringExclude
+        protected final GridFutureAdapter<?> complete = new GridFutureAdapter<>();
+
+        /**
+         * Constructor.
+         *
+         * @param id Latch id.
+         * @param topVer Latch topology version.
+         * @param participants Participant nodes.
+         */
+        CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+            this.id = id;
+            this.topVer = topVer;
+            this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void await() throws IgniteCheckedException {
+            complete.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException {
+            complete.get(timeout, timeUnit);
+        }
+
+        /**
+         * Checks if latch participants contain given {@code node}.
+         *
+         * @param node Node.
+         * @return {@code true} if latch participants contain given node.
+         */
+        boolean hasParticipant(UUID node) {
+            return participants.contains(node);
+        }
+
+        /**
+         * @return {@code true} if latch is completed.
+         */
+        boolean isCompleted() {
+            return complete.isDone();
+        }
+
+        /**
+         * Completes current latch.
+         */
+        void complete() {
+            complete.onDone();
+        }
+
+        /**
+         * Completes current latch with given {@code error}.
+         *
+         * @param error Error.
+         */
+        void complete(Throwable error) {
+            complete.onDone(error);
+        }
+
+        /**
+         * @return Full latch id.
+         */
+        String latchId() {
+            return id + "-" + topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CompletableLatch.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
new file mode 100644
index 0000000..9704c2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Simple distributed count down latch interface.
+ * Latch supports count down and await logic.
+ * Latch functionality is not relied on caches and has own state management {@link ExchangeLatchManager}.
+ */
+public interface Latch {
+    /**
+     * Decrements count on current latch.
+     * Release all latch waiters on all nodes if count reaches zero.
+     *
+     * This is idempotent operation. Invoking this method twice or more on the same node doesn't have any effect.
+     */
+    void countDown();
+
+    /**
+     * Awaits current latch completion.
+     *
+     * @throws IgniteCheckedException If await is failed.
+     */
+    void await() throws IgniteCheckedException;
+
+    /**
+     * Awaits current latch completion with specified timeout.
+     *
+     * @param timeout Timeout value.
+     * @param timeUnit Timeout time unit.
+     * @throws IgniteCheckedException If await is failed.
+     */
+    void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException;
+}


[06/10] ignite git commit: IGNITE-7830: Knn Lin Reg with new datasets

Posted by ak...@apache.org.
IGNITE-7830: Knn Lin Reg with new datasets

this closes #3583


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4653b7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4653b7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4653b7c

Branch: refs/heads/ignite-8201
Commit: a4653b7c1287a039206bf22e9d85125bb15bc412
Parents: 6557fe6
Author: zaleslaw <za...@gmail.com>
Authored: Wed Apr 11 12:31:48 2018 +0300
Committer: YuriBabak <y....@gmail.com>
Committed: Wed Apr 11 12:31:48 2018 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/ml/knn/KNNUtils.java |  59 ++++++++
 .../KNNClassificationTrainer.java               |  23 +--
 .../ml/knn/regression/KNNRegressionModel.java   |  87 +++++++++++
 .../ml/knn/regression/KNNRegressionTrainer.java |  40 ++++++
 .../ignite/ml/knn/regression/package-info.java  |  22 +++
 .../apache/ignite/ml/knn/KNNRegressionTest.java | 143 +++++++++++++++++++
 .../org/apache/ignite/ml/knn/KNNTestSuite.java  |   1 +
 7 files changed, 354 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
new file mode 100644
index 0000000..88fa70f
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn;
+
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.structures.LabeledDataset;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.structures.partition.LabeledDatasetPartitionDataBuilderOnHeap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for KNNRegression.
+ */
+public class KNNUtils {
+    /**
+     * Builds dataset.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor.
+     * @param lbExtractor Label extractor.
+     * @return Dataset.
+     */
+    @Nullable public static <K, V> Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> buildDataset(DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
+        PartitionDataBuilder<K, V, KNNPartitionContext, LabeledDataset<Double, LabeledVector>> partDataBuilder
+            = new LabeledDatasetPartitionDataBuilderOnHeap<>(
+            featureExtractor,
+            lbExtractor
+        );
+
+        Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset = null;
+
+        if (datasetBuilder != null) {
+            dataset = datasetBuilder.build(
+                (upstream, upstreamSize) -> new KNNPartitionContext(),
+                partDataBuilder
+            );
+        }
+        return dataset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
index 357047f..c0c8e65 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
@@ -17,14 +17,9 @@
 
 package org.apache.ignite.ml.knn.classification;
 
-import org.apache.ignite.ml.dataset.Dataset;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
-import org.apache.ignite.ml.dataset.PartitionDataBuilder;
-import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.knn.KNNUtils;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.structures.LabeledDataset;
-import org.apache.ignite.ml.structures.partition.LabeledDatasetPartitionDataBuilderOnHeap;
-import org.apache.ignite.ml.structures.LabeledVector;
 import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
 
 /**
@@ -41,20 +36,6 @@ public class KNNClassificationTrainer implements SingleLabelDatasetTrainer<KNNCl
      */
     @Override public <K, V> KNNClassificationModel fit(DatasetBuilder<K, V> datasetBuilder,
         IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
-        PartitionDataBuilder<K, V, KNNPartitionContext, LabeledDataset<Double, LabeledVector>> partDataBuilder
-            = new LabeledDatasetPartitionDataBuilderOnHeap<>(
-            featureExtractor,
-            lbExtractor
-        );
-
-        Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset = null;
-
-        if (datasetBuilder != null) {
-            dataset = datasetBuilder.build(
-                (upstream, upstreamSize) -> new KNNPartitionContext(),
-                partDataBuilder
-            );
-        }
-        return new KNNClassificationModel<>(dataset);
+        return new KNNClassificationModel<>(KNNUtils.buildDataset(datasetBuilder, featureExtractor, lbExtractor));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
new file mode 100644
index 0000000..cabc143
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.knn.regression;
+
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.knn.classification.KNNClassificationModel;
+import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.structures.LabeledDataset;
+import org.apache.ignite.ml.structures.LabeledVector;
+
+import java.util.List;
+
+/**
+ * This class provides kNN Multiple Linear Regression or Locally [weighted] regression (Simple and Weighted versions).
+ *
+ * <p> This is an instance-based learning method. </p>
+ *
+ * <ul>
+ *     <li>Local means using nearby points (i.e. a nearest neighbors approach).</li>
+ *     <li>Weighted means we value points based upon how far away they are.</li>
+ *     <li>Regression means approximating a function.</li>
+ * </ul>
+ */
+public class KNNRegressionModel<K,V> extends KNNClassificationModel<K,V> {
+    /**
+     * Builds the model via prepared dataset.
+     * @param dataset Specially prepared object to run algorithm over it.
+     */
+    public KNNRegressionModel(Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset) {
+        super(dataset);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Double apply(Vector v) {
+        List<LabeledVector> neighbors = findKNearestNeighbors(v);
+
+        return predictYBasedOn(neighbors, v);
+    }
+
+    /** */
+    private double predictYBasedOn(List<LabeledVector> neighbors, Vector v) {
+        switch (stgy) {
+            case SIMPLE:
+                return simpleRegression(neighbors);
+            case WEIGHTED:
+                return weightedRegression(neighbors, v);
+            default:
+                throw new UnsupportedOperationException("Strategy " + stgy.name() + " is not supported");
+        }
+    }
+
+    /** */
+    private double weightedRegression(List<LabeledVector> neighbors, Vector v) {
+        double sum = 0.0;
+        double div = 0.0;
+        for (LabeledVector<Vector, Double> neighbor : neighbors) {
+            double distance = distanceMeasure.compute(v, neighbor.features());
+            sum += neighbor.label() * distance;
+            div += distance;
+        }
+        return sum / div;
+    }
+
+    /** */
+    private double simpleRegression(List<LabeledVector> neighbors) {
+        double sum = 0.0;
+        for (LabeledVector<Vector, Double> neighbor : neighbors)
+            sum += neighbor.label();
+        return sum / (double)k;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
new file mode 100644
index 0000000..2d13cd5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn.regression;
+
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.knn.KNNUtils;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * kNN algorithm trainer to solve regression task.
+ */
+public class KNNRegressionTrainer{
+    /**
+     * Trains model based on the specified data.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor.
+     * @param lbExtractor Label extractor.
+     * @return Model.
+     */
+    public <K, V> KNNRegressionModel fit(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
+        return new KNNRegressionModel<>(KNNUtils.buildDataset(datasetBuilder, featureExtractor, lbExtractor));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
new file mode 100644
index 0000000..82e7192
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains helper classes for kNN regression algorithms.
+ */
+package org.apache.ignite.ml.knn.regression;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
new file mode 100644
index 0000000..66dbca9
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+import org.apache.ignite.ml.knn.classification.KNNStrategy;
+import org.apache.ignite.ml.knn.regression.KNNRegressionModel;
+import org.apache.ignite.ml.knn.regression.KNNRegressionTrainer;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.distances.EuclideanDistance;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link KNNRegressionTrainer}.
+ */
+public class KNNRegressionTest extends BaseKNNTest {
+    /** */
+    private double[] y;
+
+    /** */
+    private double[][] x;
+
+    /** */
+    public void testSimpleRegressionWithOneNeighbour() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        Map<Integer, double[]> data = new HashMap<>();
+        data.put(0, new double[] {11.0, 0, 0, 0, 0, 0});
+        data.put(1, new double[] {12.0, 2.0, 0, 0, 0, 0});
+        data.put(2, new double[] {13.0, 0, 3.0, 0, 0, 0});
+        data.put(3, new double[] {14.0, 0, 0, 4.0, 0, 0});
+        data.put(4, new double[] {15.0, 0, 0, 0, 5.0, 0});
+        data.put(5, new double[] {16.0, 0, 0, 0, 0, 6.0});
+
+        KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+        KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+            new LocalDatasetBuilder<>(data, 2),
+            (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+            (k, v) -> v[0]
+        ).withK(1)
+            .withDistanceMeasure(new EuclideanDistance())
+            .withStrategy(KNNStrategy.SIMPLE);
+
+        Vector vector = new DenseLocalOnHeapVector(new double[] {0, 0, 0, 5.0, 0.0});
+        System.out.println(knnMdl.apply(vector));
+        Assert.assertEquals(15, knnMdl.apply(vector), 1E-12);
+    }
+
+    /** */
+    public void testLongly() {
+
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        Map<Integer, double[]> data = new HashMap<>();
+        data.put(0, new double[] {60323, 83.0, 234289, 2356, 1590, 107608, 1947});
+        data.put(1, new double[] {61122, 88.5, 259426, 2325, 1456, 108632, 1948});
+        data.put(2, new double[] {60171, 88.2, 258054, 3682, 1616, 109773, 1949});
+        data.put(3, new double[] {61187, 89.5, 284599, 3351, 1650, 110929, 1950});
+        data.put(4, new double[] {63221, 96.2, 328975, 2099, 3099, 112075, 1951});
+        data.put(5, new double[] {63639, 98.1, 346999, 1932, 3594, 113270, 1952});
+        data.put(6, new double[] {64989, 99.0, 365385, 1870, 3547, 115094, 1953});
+        data.put(7, new double[] {63761, 100.0, 363112, 3578, 3350, 116219, 1954});
+        data.put(8, new double[] {66019, 101.2, 397469, 2904, 3048, 117388, 1955});
+        data.put(9, new double[] {68169, 108.4, 442769, 2936, 2798, 120445, 1957});
+        data.put(10, new double[] {66513, 110.8, 444546, 4681, 2637, 121950, 1958});
+        data.put(11, new double[] {68655, 112.6, 482704, 3813, 2552, 123366, 1959});
+        data.put(12, new double[] {69564, 114.2, 502601, 3931, 2514, 125368, 1960});
+        data.put(13, new double[] {69331, 115.7, 518173, 4806, 2572, 127852, 1961});
+        data.put(14, new double[] {70551, 116.9, 554894, 4007, 2827, 130081, 1962});
+
+        KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+        KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+            new LocalDatasetBuilder<>(data, 2),
+            (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+            (k, v) -> v[0]
+        ).withK(3)
+            .withDistanceMeasure(new EuclideanDistance())
+            .withStrategy(KNNStrategy.SIMPLE);
+
+        Vector vector = new DenseLocalOnHeapVector(new double[] {104.6, 419180, 2822, 2857, 118734, 1956});
+        System.out.println(knnMdl.apply(vector));
+        Assert.assertEquals(67857, knnMdl.apply(vector), 2000);
+    }
+
+    /** */
+    public void testLonglyWithWeightedStrategy() {
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        Map<Integer, double[]> data = new HashMap<>();
+        data.put(0, new double[] {60323, 83.0, 234289, 2356, 1590, 107608, 1947});
+        data.put(1, new double[] {61122, 88.5, 259426, 2325, 1456, 108632, 1948});
+        data.put(2, new double[] {60171, 88.2, 258054, 3682, 1616, 109773, 1949});
+        data.put(3, new double[] {61187, 89.5, 284599, 3351, 1650, 110929, 1950});
+        data.put(4, new double[] {63221, 96.2, 328975, 2099, 3099, 112075, 1951});
+        data.put(5, new double[] {63639, 98.1, 346999, 1932, 3594, 113270, 1952});
+        data.put(6, new double[] {64989, 99.0, 365385, 1870, 3547, 115094, 1953});
+        data.put(7, new double[] {63761, 100.0, 363112, 3578, 3350, 116219, 1954});
+        data.put(8, new double[] {66019, 101.2, 397469, 2904, 3048, 117388, 1955});
+        data.put(9, new double[] {68169, 108.4, 442769, 2936, 2798, 120445, 1957});
+        data.put(10, new double[] {66513, 110.8, 444546, 4681, 2637, 121950, 1958});
+        data.put(11, new double[] {68655, 112.6, 482704, 3813, 2552, 123366, 1959});
+        data.put(12, new double[] {69564, 114.2, 502601, 3931, 2514, 125368, 1960});
+        data.put(13, new double[] {69331, 115.7, 518173, 4806, 2572, 127852, 1961});
+        data.put(14, new double[] {70551, 116.9, 554894, 4007, 2827, 130081, 1962});
+
+        KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+        KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+            new LocalDatasetBuilder<>(data, 2),
+            (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+            (k, v) -> v[0]
+        ).withK(3)
+            .withDistanceMeasure(new EuclideanDistance())
+            .withStrategy(KNNStrategy.SIMPLE);
+
+        Vector vector = new DenseLocalOnHeapVector(new double[] {104.6, 419180, 2822, 2857, 118734, 1956});
+        System.out.println(knnMdl.apply(vector));
+        Assert.assertEquals(67857, knnMdl.apply(vector), 2000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
index 95ebec5..55ef24e 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
@@ -26,6 +26,7 @@ import org.junit.runners.Suite;
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     KNNClassificationTest.class,
+    KNNRegressionTest.class,
     LabeledDatasetTest.class
 })
 public class KNNTestSuite {


[04/10] ignite git commit: IGNITE-7222 .NET: Ignore missing IgniteConfiguration.CommunicationFailureResolver

Posted by ak...@apache.org.
IGNITE-7222 .NET: Ignore missing IgniteConfiguration.CommunicationFailureResolver


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780fc07b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780fc07b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780fc07b

Branch: refs/heads/ignite-8201
Commit: 780fc07be0b257b578647682585c89548e6d695d
Parents: da77b98
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Wed Apr 11 11:51:45 2018 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Wed Apr 11 11:51:45 2018 +0300

----------------------------------------------------------------------
 .../ApiParity/IgniteConfigurationParityTest.cs                    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/780fc07b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
index d68083f..bf34fc0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
@@ -63,7 +63,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
             "ClassLoader",
             "CacheStoreSessionListenerFactories",
             "PlatformConfiguration",
-            "ExecutorConfiguration"
+            "ExecutorConfiguration",
+            "CommunicationFailureResolver"
         };
 
         /** Properties that are missing on .NET side. */