You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/04/11 12:28:23 UTC
[01/10] ignite git commit: IGNITE-8201 WIP.
Repository: ignite
Updated Branches:
refs/heads/ignite-8201 df938d562 -> 3780ac0a0
IGNITE-8201 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86d0bcb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86d0bcb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86d0bcb7
Branch: refs/heads/ignite-8201
Commit: 86d0bcb7d134686fb521dd7da5dc0e583d7f917c
Parents: df938d5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 15:19:58 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 15:19:58 2018 +0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 21 ++++++++++----------
...ettyRestProcessorAuthenticationSelfTest.java | 2 +-
.../processors/rest/GridRestProcessor.java | 2 +-
3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 5201b33..38f22ba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -294,8 +294,9 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
/**
* @param content Content to check.
+ * @return JSON node with actual response.
*/
- private JsonNode jsonCacheOperationResponse(String content, boolean bulk) throws IOException {
+ protected JsonNode assertResponseSucceeded(String content, boolean bulk) throws IOException {
assertNotNull(content);
assertFalse(content.isEmpty());
@@ -315,7 +316,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @param res Response.
*/
private void assertCacheOperation(String content, Object res) throws IOException {
- JsonNode ret = jsonCacheOperationResponse(content, false);
+ JsonNode ret = assertResponseSucceeded(content, false);
assertEquals(String.valueOf(res), ret.isObject() ? ret.toString() : ret.asText());
}
@@ -325,7 +326,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @param res Response.
*/
private void assertCacheBulkOperation(String content, Object res) throws IOException {
- JsonNode ret = jsonCacheOperationResponse(content, true);
+ JsonNode ret = assertResponseSucceeded(content, true);
assertEquals(String.valueOf(res), ret.asText());
}
@@ -334,7 +335,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @param content Content to check.
*/
private void assertCacheMetrics(String content) throws IOException {
- JsonNode ret = jsonCacheOperationResponse(content, true);
+ JsonNode ret = assertResponseSucceeded(content, true);
assertTrue(ret.isObject());
}
@@ -403,7 +404,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
* @throws IOException If failed.
*/
private void checkJson(String json, Person p) throws IOException {
- JsonNode res = jsonCacheOperationResponse(json, false);
+ JsonNode res = assertResponseSucceeded(json, false);
assertEquals(p.id.intValue(), res.get("id").asInt());
assertEquals(p.getOrganizationId().intValue(), res.get("orgId").asInt());
@@ -455,7 +456,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Get command result: " + ret);
- JsonNode res = jsonCacheOperationResponse(ret, false);
+ JsonNode res = assertResponseSucceeded(ret, false);
assertEquals("Alex", res.get("NAME").asText());
assertEquals(300, res.get("SALARY").asInt());
@@ -476,7 +477,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Get command result: " + ret);
- JsonNode json = jsonCacheOperationResponse(ret, false);
+ JsonNode json = assertResponseSucceeded(ret, false);
assertEquals(ref1.name, json.get("name").asText());
ref2.ref(ref1);
@@ -552,7 +553,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Get command result: " + ret);
- JsonNode res = jsonCacheOperationResponse(ret, false);
+ JsonNode res = assertResponseSucceeded(ret, false);
assertEquals(p.id, res.get("id").asInt());
assertEquals(p.name, res.get("name").asText());
@@ -637,7 +638,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Get command result: " + ret);
- JsonNode res = jsonCacheOperationResponse(ret, false);
+ JsonNode res = assertResponseSucceeded(ret, false);
assertEquals(t.getKey(), res.get("key").asText());
assertEquals(t.getValue(), res.get("value").asText());
@@ -775,7 +776,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Get all command result: " + ret);
- JsonNode res = jsonCacheOperationResponse(ret, true);
+ JsonNode res = assertResponseSucceeded(ret, true);
assertTrue(res.isObject());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
index 68a96da..145d7e9 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
@@ -134,7 +134,7 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
String ret = content(null, GridRestCommand.VERSION);
- assertResponseContainsError(ret, "The user name or password is incorrect");
+ assertResponseSucceeded(ret, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d0bcb7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index a823be0..6ca1351 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -342,7 +342,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
assert res != null;
- if (ctx.security().enabled() && !failed)
+ if ((ctx.authentication().enabled() || ctx.security().enabled()) && !failed)
res.sessionTokenBytes(req.sessionToken());
interceptResponse(res, req);
[09/10] ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-8201
Posted by ak...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-8201
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe21ed57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe21ed57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe21ed57
Branch: refs/heads/ignite-8201
Commit: fe21ed5710396c6b2cb7b9cbb1f8f115e697dc49
Parents: 86d0bcb 74d2545
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 17:39:20 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 17:39:20 2018 +0700
----------------------------------------------------------------------
.../configuration/DataStorageConfiguration.java | 6 +-
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../communication/GridIoMessageFactory.java | 6 +
.../discovery/GridDiscoveryManager.java | 10 +
.../MetaPageUpdatePartitionDataRecord.java | 2 +-
.../processors/cache/CacheMetricsImpl.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 38 +
.../GridCachePartitionExchangeManager.java | 17 +
.../cache/GridCacheSharedContext.java | 9 +-
.../processors/cache/GridCacheUtils.java | 2 +-
.../cache/IgniteCacheOffheapManager.java | 8 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 10 +-
.../dht/GridClientPartitionTopology.java | 5 +
.../distributed/dht/GridDhtLocalPartition.java | 9 +-
.../dht/GridDhtPartitionTopology.java | 6 +
.../dht/GridDhtPartitionTopologyImpl.java | 26 +-
.../dht/GridDhtPartitionsStateValidator.java | 255 +++++++
.../cache/distributed/dht/GridDhtTxLocal.java | 5 +
.../GridDhtPartitionsExchangeFuture.java | 96 ++-
.../GridDhtPartitionsSingleMessage.java | 68 +-
.../dht/preloader/InitNewCoordinatorFuture.java | 2 +-
.../preloader/latch/ExchangeLatchManager.java | 695 +++++++++++++++++++
.../distributed/dht/preloader/latch/Latch.java | 52 ++
.../dht/preloader/latch/LatchAckMessage.java | 165 +++++
.../cache/distributed/near/GridNearTxLocal.java | 10 +
.../persistence/GridCacheOffheapManager.java | 10 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 36 +-
...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +-
.../DataStorageConfigurationValidationTest.java | 33 +-
.../processors/cache/IgniteCacheGroupsTest.java | 1 +
...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
.../GridCachePartitionsStateValidationTest.java | 316 +++++++++
...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
.../db/wal/IgniteWalFlushFailoverTest.java | 4 +-
...lFlushMultiNodeFailoverAbstractSelfTest.java | 4 +-
.../TxOptimisticOnPartitionExchangeTest.java | 322 +++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 4 +
.../testsuites/IgniteCacheTestSuite6.java | 6 +
.../java/org/apache/ignite/ml/knn/KNNUtils.java | 59 ++
.../KNNClassificationTrainer.java | 23 +-
.../ml/knn/regression/KNNRegressionModel.java | 87 +++
.../ml/knn/regression/KNNRegressionTrainer.java | 40 ++
.../ignite/ml/knn/regression/package-info.java | 22 +
.../apache/ignite/ml/knn/KNNRegressionTest.java | 143 ++++
.../org/apache/ignite/ml/knn/KNNTestSuite.java | 1 +
.../ApiParity/IgniteConfigurationParityTest.cs | 3 +-
modules/web-console/frontend/app/app.config.js | 14 +-
.../components/modal-import-models/component.js | 4 +-
.../app/components/page-profile/controller.js | 4 +-
.../frontend/app/modules/ace.module.js | 47 +-
.../services/AngularStrapSelect.decorator.js | 5 +-
.../services/AngularStrapTooltip.decorator.js | 8 +-
.../frontend/app/services/FormUtils.service.js | 3 +-
parent/pom.xml | 4 +
55 files changed, 3012 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
[10/10] ignite git commit: IGNITE-8201 WIP.
Posted by ak...@apache.org.
IGNITE-8201 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3780ac0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3780ac0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3780ac0a
Branch: refs/heads/ignite-8201
Commit: 3780ac0a09198e2918206d23ee515a2fe522352c
Parents: fe21ed5
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Apr 11 19:28:11 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 19:28:11 2018 +0700
----------------------------------------------------------------------
...ettyRestProcessorAuthenticationSelfTest.java | 43 ++++++++++----------
.../http/jetty/GridJettyRestHandler.java | 8 +++-
2 files changed, 28 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3780ac0a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
index 145d7e9..1536d9d 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAuthenticationSelfTest.java
@@ -41,12 +41,6 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
private static final String DFLT_PWD = "ignite";
/** */
- private String user = DFLT_USER;
-
- /** */
- private String pwd = DFLT_PWD;
-
- /** */
private String tok = "";
/** {@inheritDoc} */
@@ -60,8 +54,22 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- user = DFLT_USER;
- pwd = DFLT_PWD;
+ // Authenticate and extract token.
+ if (F.isEmpty(tok)) {
+ String ret = content(null, GridRestCommand.AUTHENTICATE,
+ "user", DFLT_USER,
+ "password", DFLT_PWD);
+
+ int p1 = ret.indexOf("sessionToken");
+ int p2 = ret.indexOf('"', p1 + 16);
+
+ tok = ret.substring(p1 + 15, p2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean securityEnabled() {
+ return true;
}
/** {@inheritDoc} */
@@ -104,14 +112,8 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
@Override protected String restUrl() {
String url = super.restUrl();
- if (!F.isEmpty(user)) {
- url += "user=" + user;
-
- if (!F.isEmpty(pwd))
- url += "&password=" + pwd;
-
- url += '&';
- }
+ if (!F.isEmpty(tok))
+ url += "sessionToken=" + tok + "&";
return url;
}
@@ -122,19 +124,18 @@ public class JettyRestProcessorAuthenticationSelfTest extends JettyRestProcessor
public void testAuthenticationCommand() throws Exception {
String ret = content(null, GridRestCommand.AUTHENTICATE);
- assertResponseContainsError(ret, "The user name or password is incorrect");
+ assertResponseSucceeded(ret, false);
}
/**
* @throws Exception If failed.
*/
- public void testMissingCredentials() throws Exception {
- user = null;
- pwd = null;
+ public void testMissingSessionToken() throws Exception {
+ tok = null;
String ret = content(null, GridRestCommand.VERSION);
- assertResponseSucceeded(ret, false);
+ assertResponseContainsError(ret, "The user name or password is incorrect");
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3780ac0a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index e70ef6a..7f20dca 100644
--- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -69,6 +69,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.client.GridClientCacheFlag.KEEP_BINARIES_MASK;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.AUTHENTICATE;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CONTAINS_KEYS;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_ALL;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PUT_ALL;
@@ -852,8 +853,11 @@ public class GridJettyRestHandler extends AbstractHandler {
restReq.command(cmd);
- if (!credentials(params, USER_PARAM, PWD_PARAM, restReq))
- credentials(params, IGNITE_LOGIN_PARAM, IGNITE_PASSWORD_PARAM, restReq);
+ // Check credentials only for AUTHENTICATE command.
+ if (cmd == AUTHENTICATE) {
+ if (!credentials(params, USER_PARAM, PWD_PARAM, restReq))
+ credentials(params, IGNITE_LOGIN_PARAM, IGNITE_PASSWORD_PARAM, restReq);
+ }
String clientId = (String)params.get("clientId");
[02/10] ignite git commit: IGNITE-7871 Implemented additional
synchronization phase for correct partition counters update
Posted by ak...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
new file mode 100644
index 0000000..bad1b61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message is used to send acks for {@link Latch} instances management.
+ */
+public class LatchAckMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Latch id. */
+ private String latchId;
+
+ /** Latch topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Flag indicates that ack is final. */
+ private boolean isFinal;
+
+ /**
+ * Constructor.
+ *
+ * @param latchId Latch id.
+ * @param topVer Latch topology version.
+ * @param isFinal Final acknowledgement flag.
+ */
+ public LatchAckMessage(String latchId, AffinityTopologyVersion topVer, boolean isFinal) {
+ this.latchId = latchId;
+ this.topVer = topVer;
+ this.isFinal = isFinal;
+ }
+
+ /**
+ * Empty constructor for marshalling purposes.
+ */
+ public LatchAckMessage() {
+ }
+
+ /**
+ * @return Latch id.
+ */
+ public String latchId() {
+ return latchId;
+ }
+
+ /**
+ * @return Latch topology version.
+ */
+ public AffinityTopologyVersion topVer() {
+ return topVer;
+ }
+
+ /**
+ * @return {@code} if ack is final.
+ */
+ public boolean isFinal() {
+ return isFinal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeBoolean("isFinal", isFinal))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeString("latchId", latchId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ isFinal = reader.readBoolean("isFinal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ latchId = reader.readString("latchId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(LatchAckMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 135;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 7785605..33f84f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3525,6 +3525,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
U.error(log, "Failed to prepare transaction: " + this, e);
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (err != null)
fut.rollbackOnError(err);
@@ -3544,6 +3549,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
U.error(log, "Failed to prepare transaction: " + this, e);
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (err != null)
fut.rollbackOnError(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5cfd92d..68ec83d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -189,7 +189,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
freeList.saveMetadata();
long updCntr = store.updateCounter();
- int size = store.fullSize();
+ long size = store.fullSize();
long rmvId = globalRemoveId().get();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
@@ -318,7 +318,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
partMetaId,
updCntr,
rmvId,
- size,
+ (int)size, // TODO: Partition size may be long
cntrsPageId,
state == null ? -1 : (byte)state.ordinal(),
pageCnt
@@ -549,7 +549,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
final int grpId,
final int partId,
final int currAllocatedPageCnt,
- final int partSize
+ final long partSize
) {
if (part != null) {
boolean reserved = part.reserve();
@@ -1301,7 +1301,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public int fullSize() {
+ @Override public long fullSize() {
try {
CacheDataStore delegate0 = init0(true);
@@ -1313,7 +1313,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public int cacheSize(int cacheId) {
+ @Override public long cacheSize(int cacheId) {
try {
CacheDataStore delegate0 = init0(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 9bfaaf3..945ef48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -490,7 +490,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@Override public AffinityTopologyVersion topologyVersion() {
AffinityTopologyVersion res = topVer;
- if (res.equals(AffinityTopologyVersion.NONE)) {
+ if (res == null || res.equals(AffinityTopologyVersion.NONE)) {
if (system()) {
AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fbdeca1..9fb8777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -545,10 +545,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version.
* @return Future that will be completed when all ongoing transactions are finished.
*/
- public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) {
+ public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer) {
GridCompoundFuture<IgniteInternalTx, Boolean> res =
new CacheObjectsReleaseFuture<>(
- "Tx",
+ "LocalTx",
topVer,
new IgniteReducer<IgniteInternalTx, Boolean>() {
@Override public boolean collect(IgniteInternalTx e) {
@@ -561,8 +561,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
});
for (IgniteInternalTx tx : txs()) {
- if (needWaitTransaction(tx, topVer))
+ if (needWaitTransaction(tx, topVer)) {
res.add(tx.finishFuture());
+ }
}
res.markInitialized();
@@ -571,6 +572,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Creates a future that will wait for finishing all tx updates on backups after all local transactions are finished.
+ *
+ * NOTE:
+ * As we send finish request to backup nodes after transaction successfully completed on primary node
+ * it's important to ensure that all updates from primary to backup are finished or at least remote transaction has created on backup node.
+ *
+ * @param finishLocalTxsFuture Local transactions finish future.
+ * @param topVer Topology version.
+ * @return Future that will be completed when all ongoing transactions are finished.
+ */
+ public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> finishLocalTxsFuture, AffinityTopologyVersion topVer) {
+ final GridCompoundFuture finishAllTxsFuture = new CacheObjectsReleaseFuture("AllTx", topVer);
+
+ // After finishing all local updates, wait for finishing all tx updates on backups.
+ finishLocalTxsFuture.listen(future -> {
+ finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer));
+ finishAllTxsFuture.markInitialized();
+ });
+
+ return finishAllTxsFuture;
+ }
+
+ /**
* @param tx Transaction.
* @param topVer Exchange version.
* @return {@code True} if need wait transaction for exchange.
@@ -1834,12 +1858,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return Finish future for related remote transactions.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
- GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+ public IgniteInternalFuture<IgniteInternalTx> remoteTxFinishFuture(GridCacheVersion nearVer) {
+ GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>();
for (final IgniteInternalTx tx : txs()) {
if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
- fut.add((IgniteInternalFuture) tx.finishFuture());
+ fut.add(tx.finishFuture());
}
fut.markInitialized();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
index 7263656..702b188 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
@@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac
cache = grid(g).cache(DEFAULT_CACHE_NAME);
for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) {
- int size = p.dataStore().fullSize();
+ long size = p.dataStore().fullSize();
assertTrue("Unexpected size: " + size, size <= 32);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 468bbc8..6c570d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
new file mode 100644
index 0000000..52cd033
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Lists;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed.
+ */
+public class IgniteExchangeLatchManagerCoordinatorFailTest extends GridCommonAbstractTest {
+ /** */
+ private static final String LATCH_NAME = "test";
+
+ /** 5 nodes. */
+ private final AffinityTopologyVersion latchTopVer = new AffinityTopologyVersion(5, 0);
+
+ /** Wait before latch creation. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCreate = (mgr, syncLatch) -> {
+ try {
+ syncLatch.countDown();
+ syncLatch.await();
+
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ distributedLatch.countDown();
+
+ distributedLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** Wait before latch count down. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCountDown = (mgr, syncLatch) -> {
+ try {
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ syncLatch.countDown();
+ syncLatch.await();
+
+ distributedLatch.countDown();
+
+ distributedLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception ", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** Wait after all operations are successful. */
+ private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> all = (mgr, syncLatch) -> {
+ try {
+ Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer);
+
+ distributedLatch.countDown();
+
+ syncLatch.countDown();
+
+ distributedLatch.await();
+
+ syncLatch.await();
+ } catch (Exception e) {
+ log.error("Unexpected exception ", e);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test scenarios description:
+ *
+ * We have existing coordinator and 4 other nodes.
+ * Each node do following operations:
+ * 1) Create latch
+ * 2) Countdown latch
+ * 3) Await latch
+ *
+ * While nodes do the operations we shutdown coordinator and next oldest node become new coordinator.
+ * We should check that new coordinator properly restored latch and all nodes finished latch completion successfully after that.
+ *
+ * Each node before coordinator shutdown can be in 3 different states:
+ *
+ * State {@link #beforeCreate} - Node didn't create latch yet.
+ * State {@link #beforeCountDown} - Node created latch but didn't count down it yet.
+ * State {@link #all} - Node created latch and count downed it.
+ *
+ * We should check important cases when future coordinator is in one of these states, and other 3 nodes have 3 different states.
+ */
+
+ /**
+ * Scenario 1:
+ *
+ * Node 1 state -> {@link #beforeCreate}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail1() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ beforeCreate,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Scenario 2:
+ *
+ * Node 1 state -> {@link #beforeCountDown}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail2() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ beforeCountDown,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Scenario 3:
+ *
+ * Node 1 state -> {@link #all}
+ * Node 2 state -> {@link #beforeCountDown}
+ * Node 3 state -> {@link #all}
+ * Node 4 state -> {@link #beforeCreate}
+ */
+ public void testCoordinatorFail3() throws Exception {
+ List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList(
+ all,
+ beforeCountDown,
+ all,
+ beforeCreate
+ );
+
+ doTestCoordinatorFail(nodeStates);
+ }
+
+ /**
+ * Test latch coordinator fail with specified scenarios.
+ *
+ * @param nodeScenarios Node scenarios.
+ * @throws Exception If failed.
+ */
+ private void doTestCoordinatorFail(List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeScenarios) throws Exception {
+ IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5);
+ crd.cluster().active(true);
+
+ // Latch to synchronize node states.
+ CountDownLatch syncLatch = new CountDownLatch(5);
+
+ GridCompoundFuture finishAllLatches = new GridCompoundFuture();
+
+ AtomicBoolean hasErrors = new AtomicBoolean();
+
+ for (int node = 1; node < 5; node++) {
+ IgniteEx grid = grid(node);
+ ExchangeLatchManager latchMgr = grid.context().cache().context().exchange().latch();
+ final int stateIdx = node - 1;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, syncLatch);
+ if (!success)
+ hasErrors.set(true);
+ }, 1, "latch-runner-" + node);
+
+ finishAllLatches.add(fut);
+ }
+
+ finishAllLatches.markInitialized();
+
+ // Wait while all nodes reaches their states.
+ while (syncLatch.getCount() != 1) {
+ Thread.sleep(10);
+
+ if (hasErrors.get())
+ throw new Exception("All nodes should complete latches without errors");
+ }
+
+ crd.close();
+
+ // Resume progress for all nodes.
+ syncLatch.countDown();
+
+ // Wait for distributed latch completion.
+ finishAllLatches.get(5000);
+
+ Assert.assertFalse("All nodes should complete latches without errors", hasErrors.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
new file mode 100644
index 0000000..63d772a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class GridCachePartitionsStateValidationTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private boolean clientMode;
+
+ /** {@inheritDoc */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ cfg.setCommunicationSpi(new SingleMessageInterceptorCommunicationSpi(2));
+
+ if (clientMode)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ clientMode = false;
+ }
+
+ /**
+ * Test that partitions state validation works correctly.
+ *
+ * @throws Exception If failed.
+ */
+ public void testValidationIfPartitionCountersAreInconsistent() throws Exception {
+ IgniteEx ignite = (IgniteEx) startGrids(2);
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ // Modify update counter for some partition.
+ for (GridDhtLocalPartition partition : ignite.cachex(CACHE_NAME).context().topology().localPartitions()) {
+ partition.updateCounter(100500L);
+ break;
+ }
+
+ // Trigger exchange.
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ // Nothing should happen (just log error message) and we're still able to put data to corrupted cache.
+ ignite.cache(CACHE_NAME).put(0, 0);
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test that all nodes send correct {@link GridDhtPartitionsSingleMessage} with consistent update counters.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPartitionCountersConsistencyOnExchange() throws Exception {
+ IgniteEx ignite = (IgniteEx) startGrids(4);
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ final String atomicCacheName = "atomic-cache";
+ final String txCacheName = "tx-cache";
+
+ clientMode = true;
+
+ Ignite client = startGrid(4);
+
+ clientMode = false;
+
+ IgniteCache atomicCache = client.getOrCreateCache(new CacheConfiguration<>(atomicCacheName)
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ IgniteCache txCache = client.getOrCreateCache(new CacheConfiguration<>(txCacheName)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ );
+
+ for (int it = 0; it < 10; it++) {
+ SingleMessageInterceptorCommunicationSpi spi = (SingleMessageInterceptorCommunicationSpi) ignite.configuration().getCommunicationSpi();
+ spi.clear();
+
+ // Stop load future.
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ // Run atomic load.
+ IgniteInternalFuture atomicLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+ int k = 0;
+
+ while (!stop.get()) {
+ k++;
+ try {
+ atomicCache.put(k, k);
+ } catch (Exception ignored) {}
+ }
+ }, 1, "atomic-load");
+
+ // Run tx load.
+ IgniteInternalFuture txLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
+ final int txOps = 5;
+
+ while (!stop.get()) {
+ List<Integer> randomKeys = Stream.generate(() -> ThreadLocalRandom.current().nextInt(5))
+ .limit(txOps)
+ .sorted()
+ .collect(Collectors.toList());
+
+ try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
+ for (Integer key : randomKeys)
+ txCache.put(key, key);
+
+ tx.commit();
+ }
+ catch (Exception ignored) { }
+ }
+ }, 4, "tx-load");
+
+ // Wait for some data.
+ Thread.sleep(1000);
+
+ // Prevent sending full message.
+ spi.blockFullMessage();
+
+ // Trigger exchange.
+ IgniteInternalFuture nodeStopFuture = GridTestUtils.runAsync(() -> stopGrid(3));
+
+ try {
+ spi.waitUntilAllSingleMessagesAreSent();
+
+ List<GridDhtPartitionsSingleMessage> interceptedMessages = spi.getMessages();
+
+ // Associate each message with existing node UUID.
+ Map<UUID, GridDhtPartitionsSingleMessage> messagesMap = new HashMap<>();
+ for (int i = 0; i < interceptedMessages.size(); i++)
+ messagesMap.put(grid(i + 1).context().localNodeId(), interceptedMessages.get(i));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(ignite.context().cache().context());
+
+ // Validate partition update counters. If counters are not consistent, exception will be thrown.
+ validator.validatePartitionsUpdateCounters(ignite.cachex(atomicCacheName).context().topology(), messagesMap, Collections.emptySet());
+ validator.validatePartitionsUpdateCounters(ignite.cachex(txCacheName).context().topology(), messagesMap, Collections.emptySet());
+
+ } finally {
+ // Stop load and resume exchange.
+ spi.unblockFullMessage();
+
+ stop.set(true);
+
+ atomicLoadFuture.get();
+ txLoadFuture.get();
+ nodeStopFuture.get();
+ }
+
+ // Return grid to initial state.
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ * SPI which intercepts single messages during exchange.
+ */
+ private static class SingleMessageInterceptorCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private static final List<GridDhtPartitionsSingleMessage> messages = new CopyOnWriteArrayList<>();
+
+ /** Future completes when {@link #singleMessagesThreshold} messages are sent to coordinator. */
+ private static final GridFutureAdapter allSingleMessagesSent = new GridFutureAdapter();
+
+ /** A number of single messages we're waiting for send. */
+ private final int singleMessagesThreshold;
+
+ /** Latch which blocks full message sending. */
+ private volatile CountDownLatch blockFullMsgLatch;
+
+ /**
+ * Constructor.
+ */
+ private SingleMessageInterceptorCommunicationSpi(int singleMessagesThreshold) {
+ this.singleMessagesThreshold = singleMessagesThreshold;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) ((GridIoMessage) msg).message();
+
+ // We're interesting for only exchange messages and when node is stopped.
+ if (singleMsg.exchangeId() != null && singleMsg.exchangeId().isLeft() && !singleMsg.client()) {
+ messages.add(singleMsg);
+
+ if (messages.size() == singleMessagesThreshold)
+ allSingleMessagesSent.onDone();
+ }
+ }
+
+ try {
+ if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsFullMessage) {
+ if (blockFullMsgLatch != null)
+ blockFullMsgLatch.await();
+ }
+ }
+ catch (Exception ignored) { }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /** */
+ public void clear() {
+ messages.clear();
+ allSingleMessagesSent.reset();
+ }
+
+ /** */
+ public List<GridDhtPartitionsSingleMessage> getMessages() {
+ return Collections.unmodifiableList(messages);
+ }
+
+ /** */
+ public void blockFullMessage() {
+ blockFullMsgLatch = new CountDownLatch(1);
+ }
+
+ /** */
+ public void unblockFullMessage() {
+ blockFullMsgLatch.countDown();
+ }
+
+ /** */
+ public void waitUntilAllSingleMessagesAreSent() throws IgniteCheckedException {
+ allSingleMessagesSent.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
new file mode 100644
index 0000000..9ed8d54
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+/**
+ * Test correct behaviour of {@link GridDhtPartitionsStateValidator} class.
+ */
+public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstractTest {
+ /** Mocks and stubs. */
+ private final UUID localNodeId = UUID.randomUUID();
+ /** */
+ private GridCacheSharedContext cctxMock;
+ /** */
+ private GridDhtPartitionTopology topologyMock;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ // Prepare mocks.
+ cctxMock = Mockito.mock(GridCacheSharedContext.class);
+ Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId);
+
+ topologyMock = Mockito.mock(GridDhtPartitionTopology.class);
+ Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING);
+ Mockito.when(topologyMock.groupId()).thenReturn(0);
+ Mockito.when(topologyMock.partitions()).thenReturn(3);
+
+ List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
+ partitionMock(0, 1, 1),
+ partitionMock(1, 2, 2),
+ partitionMock(2, 3, 3)
+ );
+ Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
+ Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
+ }
+
+ /**
+ * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}.
+ */
+ private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) {
+ GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class);
+ Mockito.when(partitionMock.id()).thenReturn(id);
+ Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter);
+ Mockito.when(partitionMock.fullSize()).thenReturn(size);
+ return partitionMock;
+ }
+
+ /**
+ * @return Message containing specified {@code countersMap}.
+ */
+ private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, T2<Long, Long>> countersMap) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+ msg.addPartitionUpdateCounters(0, countersMap);
+ return msg;
+ }
+
+ /**
+ * @return Message containing specified {@code sizesMap}.
+ */
+ private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> sizesMap) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+ msg.addPartitionSizes(0, sizesMap);
+ return msg;
+ }
+
+ /**
+ * Test partition update counters validation.
+ */
+ public void testPartitionCountersValidation() {
+ UUID remoteNode = UUID.randomUUID();
+ UUID ignoreNode = UUID.randomUUID();
+
+ // For partitions 0 and 2 (zero counter) we have inconsistent update counters.
+ Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>();
+ updateCountersMap.put(0, new T2<>(2L, 2L));
+ updateCountersMap.put(1, new T2<>(2L, 2L));
+
+ // Form single messages map.
+ Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+ messages.put(remoteNode, fromUpdateCounters(updateCountersMap));
+ messages.put(ignoreNode, fromUpdateCounters(updateCountersMap));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+ // (partId, (nodeId, updateCounter))
+ Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsUpdateCounters(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+ // Check that validation result contains all necessary information.
+ Assert.assertEquals(2, result.size());
+ Assert.assertTrue(result.containsKey(0));
+ Assert.assertTrue(result.containsKey(2));
+ Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+ Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+ Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+ Assert.assertTrue(result.get(2).get(remoteNode) == 0L);
+ }
+
+ /**
+ * Test partition cache sizes validation.
+ */
+ public void testPartitionCacheSizesValidation() {
+ UUID remoteNode = UUID.randomUUID();
+ UUID ignoreNode = UUID.randomUUID();
+
+ // For partitions 0 and 2 we have inconsistent cache sizes.
+ Map<Integer, Long> cacheSizesMap = new HashMap<>();
+ cacheSizesMap.put(0, 2L);
+ cacheSizesMap.put(1, 2L);
+ cacheSizesMap.put(2, 2L);
+
+ // Form single messages map.
+ Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>();
+ messages.put(remoteNode, fromCacheSizes(cacheSizesMap));
+ messages.put(ignoreNode, fromCacheSizes(cacheSizesMap));
+
+ GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock);
+
+ // (partId, (nodeId, cacheSize))
+ Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsSizes(topologyMock, messages, Sets.newHashSet(ignoreNode));
+
+ // Check that validation result contains all necessary information.
+ Assert.assertEquals(2, result.size());
+ Assert.assertTrue(result.containsKey(0));
+ Assert.assertTrue(result.containsKey(2));
+ Assert.assertTrue(result.get(0).get(localNodeId) == 1L);
+ Assert.assertTrue(result.get(0).get(remoteNode) == 2L);
+ Assert.assertTrue(result.get(2).get(localNodeId) == 3L);
+ Assert.assertTrue(result.get(2).get(remoteNode) == 2L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
new file mode 100644
index 0000000..03ea0f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T1;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class TxOptimisticOnPartitionExchangeTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 3;
+
+ /** Tx size. */
+ private static final int TX_SIZE = 20 * NODES_CNT;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** Logger started. */
+ private static volatile boolean msgInterception;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODES_CNT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi(log()));
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration()
+ .setName(CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setCacheMode(PARTITIONED)
+ .setBackups(1));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConsistencyOnPartitionExchange() throws Exception {
+ doTest(SERIALIZABLE, true);
+ doTest(READ_COMMITTED, true);
+ doTest(SERIALIZABLE, false);
+ doTest(READ_COMMITTED, false);
+ }
+
+ /**
+ * @param isolation {@link TransactionIsolation}.
+ * @param txInitiatorPrimary False If the transaction does not use the keys of the node that initiated it.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void doTest(final TransactionIsolation isolation, boolean txInitiatorPrimary) throws Exception {
+ final CountDownLatch txStarted = new CountDownLatch(1);
+
+ final IgniteCache cache = ignite(0).cache(CACHE_NAME);
+
+ final Map<Integer, Integer> txValues = new TreeMap<>();
+
+ ClusterNode node = ignite(0).cluster().node();
+
+ GridCacheAffinityManager affinity = ((IgniteCacheProxy)cache).context().affinity();
+
+ for (int i = 0; txValues.size() < TX_SIZE; i++) {
+ if (!txInitiatorPrimary && node.equals(affinity.primaryByKey(i, NONE)))
+ continue;
+
+ txValues.put(i, i);
+ }
+
+ TestCommunicationSpi.init();
+
+ msgInterception = true;
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ try (Transaction tx = ignite(0).transactions().txStart(OPTIMISTIC, isolation)) {
+ info(">>> TX started.");
+
+ txStarted.countDown();
+
+ cache.putAll(txValues);
+
+ tx.commit();
+
+ info(">>> TX committed.");
+ }
+
+ return null;
+ }
+ });
+
+ txStarted.await();
+
+ try {
+ info(">>> Grid starting.");
+
+ IgniteEx ignite = startGrid(NODES_CNT);
+
+ info(">>> Grid started.");
+
+ fut.get();
+
+ awaitPartitionMapExchange();
+
+ msgInterception = false;
+
+ IgniteCache<Object, Object> cacheStartedNode = ignite.cache(CACHE_NAME);
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Set<Object> keys = cacheStartedNode.getAll(txValues.keySet()).keySet();
+
+ assertEquals(txValues.keySet(), new TreeSet<>(keys));
+
+ tx.commit();
+ }
+ }
+ finally {
+ msgInterception = false;
+
+ stopGrid(NODES_CNT);
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("ConstantConditions")
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Partition single message sent from added node. */
+ private static volatile CountDownLatch partSingleMsgSentFromAddedNode;
+
+ /** Partition supply message sent count. */
+ private static final AtomicInteger partSupplyMsgSentCnt = new AtomicInteger();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /**
+ * @param log Logger.
+ */
+ public TestCommunicationSpi(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /**
+ *
+ */
+ public static void init() {
+ partSingleMsgSentFromAddedNode = new CountDownLatch(1);
+
+ partSupplyMsgSentCnt.set(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) throws IgniteSpiException {
+ if (msgInterception) {
+ if (msg instanceof GridIoMessage) {
+ final Message msg0 = ((GridIoMessage)msg).message();
+
+ String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+ int nodeIdx = Integer.parseInt(locNodeId.substring(locNodeId.length() - 3));
+
+ if (nodeIdx == 0) {
+ if (msg0 instanceof GridNearTxPrepareRequest || msg0 instanceof GridDhtTxPrepareRequest) {
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ partSingleMsgSentFromAddedNode.await();
+
+ sendMessage(node, msg, ackC, true);
+
+ return null;
+ }
+ });
+
+ return;
+
+ }
+ else if (msg0 instanceof GridNearTxFinishRequest || msg0 instanceof GridDhtTxFinishRequest) {
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final T1<Integer> i = new T1<>(0);
+
+ while (waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return partSupplyMsgSentCnt.get() > i.get();
+ }
+ }, i.get() == 0 ? 5_000 : 500))
+ i.set(partSupplyMsgSentCnt.get());
+
+ sendMessage(node, msg, ackC, true);
+
+ return null;
+ }
+ });
+
+ return;
+ }
+ }
+ else if (nodeIdx == NODES_CNT && msg0 instanceof GridDhtPartitionsSingleMessage)
+ partSingleMsgSentFromAddedNode.countDown();
+
+ if (msg0 instanceof GridDhtPartitionSupplyMessage)
+ partSupplyMsgSentCnt.incrementAndGet();
+ }
+ }
+
+ sendMessage(node, msg, ackC, msgInterception);
+ }
+
+ /**
+ * @param node Node.
+ * @param msg Message.
+ * @param ackC Ack closure.
+ * @param logMsg Log Messages.
+ */
+ private void sendMessage(
+ final ClusterNode node,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC,
+ boolean logMsg
+ ) throws IgniteSpiException {
+ if (logMsg) {
+ String id = node.id().toString();
+ String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString();
+
+ Message msg0 = ((GridIoMessage)msg).message();
+
+ log.info(
+ String.format(">>> Output msg[type=%s, fromNode= %s, toNode=%s]",
+ msg0.getClass().getSimpleName(),
+ locNodeId.charAt(locNodeId.length() - 1),
+ id.charAt(id.length() - 1)
+ )
+ );
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb397f7..0612615 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheT
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
@@ -292,6 +294,8 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class);
suite.addTestSuite(CacheDeferredDeleteQueueTest.class);
+ suite.addTestSuite(GridCachePartitionsStateValidatorSelfTest.class);
+ suite.addTestSuite(GridCachePartitionsStateValidationTest.class);
suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite());
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f8add30..415479d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim
import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest;
import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest;
import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticT
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
@@ -93,6 +95,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class);
suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
+ suite.addTestSuite(TxOptimisticOnPartitionExchangeTest.class);
+
+ suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class);
+
return suite;
}
}
[07/10] ignite git commit: IGNITE-8111 Add extra validation for WAL
segment size - Fixes #3768.
Posted by ak...@apache.org.
IGNITE-8111 Add extra validation for WAL segment size - Fixes #3768.
Signed-off-by: dpavlov <dp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97524668
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97524668
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97524668
Branch: refs/heads/ignite-8201
Commit: 975246687c9d143830501340e597a35d1a4c492a
Parents: a4653b7
Author: denis.garus <d....@isimplelab.com>
Authored: Wed Apr 11 13:01:22 2018 +0300
Committer: dpavlov <dp...@apache.org>
Committed: Wed Apr 11 13:01:22 2018 +0300
----------------------------------------------------------------------
.../configuration/DataStorageConfiguration.java | 6 ++--
.../DataStorageConfigurationValidationTest.java | 33 ++++++++++++++++++--
.../db/wal/IgniteWalFlushFailoverTest.java | 4 +--
...lFlushMultiNodeFailoverAbstractSelfTest.java | 4 +--
4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index a433760..747efd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -533,12 +533,14 @@ public class DataStorageConfiguration implements Serializable {
/**
* Sets size of a WAL segment.
+ * If value is not set (or zero), {@link #DFLT_WAL_SEGMENT_SIZE} will be used.
*
- * @param walSegmentSize WAL segment size. 64 MB is used by default. Maximum value is 2Gb.
+ * @param walSegmentSize WAL segment size. Value must be between 512Kb and 2Gb.
* @return {@code This} for chaining.
*/
public DataStorageConfiguration setWalSegmentSize(int walSegmentSize) {
- A.ensure(walSegmentSize >= 0, "WAL segment size must be non-negative and less than 2 Gb.");
+ if (walSegmentSize != 0)
+ A.ensure(walSegmentSize >= 512 * 1024, "WAL segment size must be between 512Kb and 2Gb.");
this.walSegmentSize = walSegmentSize;
http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
index 7f667ee..9471a82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/DataStorageConfigurationValidationTest.java
@@ -31,11 +31,10 @@ public class DataStorageConfigurationValidationTest extends TestCase {
*
* @throws Exception If failed.
*/
- public void testWalSegmentSizeOveflow() throws Exception {
+ public void testWalSegmentSizeOverflow() throws Exception {
final DataStorageConfiguration cfg = new DataStorageConfiguration();
GridTestUtils.assertThrows(null, new Callable<Void>() {
- /** {@inheritDoc} */
@Override public Void call() {
cfg.setWalSegmentSize(1 << 31);
@@ -43,4 +42,34 @@ public class DataStorageConfigurationValidationTest extends TestCase {
}
}, IllegalArgumentException.class, null);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetWalSegmentSizeShouldThrowExceptionWhenSizeLessThen512Kb() throws Exception {
+ final DataStorageConfiguration cfg = new DataStorageConfiguration();
+
+ GridTestUtils.assertThrows(null, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cfg.setWalSegmentSize(512 * 1024 - 1);
+
+ return null;
+ }
+ }, IllegalArgumentException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSetWalSegmentSizeShouldBeOkWhenSizeBetween512KbAnd2Gb() throws Exception {
+ final DataStorageConfiguration cfg = new DataStorageConfiguration();
+
+ cfg.setWalSegmentSize(512 * 1024);
+
+ assertEquals(512 * 1024, cfg.getWalSegmentSize());
+
+ cfg.setWalSegmentSize(Integer.MAX_VALUE);
+
+ assertEquals(Integer.MAX_VALUE, cfg.getWalSegmentSize());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 042a447..351a42c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -92,8 +92,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
.setWalMode(WALMode.BACKGROUND)
- .setWalBufferSize(128 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
- .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+ .setWalBufferSize(1024 * 1024)// Setting WAL Segment size to high values forces flushing by timeout.
+ .setWalSegmentSize(flushByTimeout ? 2 * 1024 * 1024 : 512 * 1024);
cfg.setDataStorageConfiguration(memCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/97524668/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index fe16328..cc0986a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -106,8 +106,8 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(2048L * 1024 * 1024).setPersistenceEnabled(true))
.setWalMode(this.walMode())
- .setWalSegmentSize(50_000)
- .setWalBufferSize(50_000);
+ .setWalSegmentSize(512 * 1024)
+ .setWalBufferSize(512 * 1024);
cfg.setDataStorageConfiguration(memCfg);
[08/10] ignite git commit: IGNITE-4091 Web Console: Refactored using
of internal Angular API.
Posted by ak...@apache.org.
IGNITE-4091 Web Console: Refactored using of internal Angular API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74d25456
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74d25456
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74d25456
Branch: refs/heads/ignite-8201
Commit: 74d254564a44a95db9945652c9b579ed6b431ee9
Parents: 9752466
Author: Alexander Kalinin <ve...@yandex.ru>
Authored: Wed Apr 11 17:09:41 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Apr 11 17:09:41 2018 +0700
----------------------------------------------------------------------
modules/web-console/frontend/app/app.config.js | 14 +++---
.../components/modal-import-models/component.js | 4 +-
.../app/components/page-profile/controller.js | 4 +-
.../frontend/app/modules/ace.module.js | 47 ++++++++++----------
.../services/AngularStrapSelect.decorator.js | 5 ++-
.../services/AngularStrapTooltip.decorator.js | 8 ++--
.../frontend/app/services/FormUtils.service.js | 3 +-
7 files changed, 45 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/app.config.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/app.config.js b/modules/web-console/frontend/app/app.config.js
index 9d8dc99..e2bc057 100644
--- a/modules/web-console/frontend/app/app.config.js
+++ b/modules/web-console/frontend/app/app.config.js
@@ -43,7 +43,7 @@ igniteConsoleCfg.config(['$animateProvider', ($animateProvider) => {
// AngularStrap modal popup configuration.
igniteConsoleCfg.config(['$modalProvider', ($modalProvider) => {
- angular.extend($modalProvider.defaults, {
+ Object.assign($modalProvider.defaults, {
animation: 'am-fade-and-scale',
placement: 'center',
html: true
@@ -52,7 +52,7 @@ igniteConsoleCfg.config(['$modalProvider', ($modalProvider) => {
// AngularStrap popover configuration.
igniteConsoleCfg.config(['$popoverProvider', ($popoverProvider) => {
- angular.extend($popoverProvider.defaults, {
+ Object.assign($popoverProvider.defaults, {
trigger: 'manual',
placement: 'right',
container: 'body',
@@ -62,7 +62,7 @@ igniteConsoleCfg.config(['$popoverProvider', ($popoverProvider) => {
// AngularStrap tooltips configuration.
igniteConsoleCfg.config(['$tooltipProvider', ($tooltipProvider) => {
- angular.extend($tooltipProvider.defaults, {
+ Object.assign($tooltipProvider.defaults, {
container: 'body',
delay: {show: 150, hide: 150},
placement: 'right',
@@ -73,7 +73,7 @@ igniteConsoleCfg.config(['$tooltipProvider', ($tooltipProvider) => {
// AngularStrap select (combobox) configuration.
igniteConsoleCfg.config(['$selectProvider', ($selectProvider) => {
- angular.extend($selectProvider.defaults, {
+ Object.assign($selectProvider.defaults, {
container: 'body',
maxLength: '5',
allText: 'Select All',
@@ -87,7 +87,7 @@ igniteConsoleCfg.config(['$selectProvider', ($selectProvider) => {
// AngularStrap alerts configuration.
igniteConsoleCfg.config(['$alertProvider', ($alertProvider) => {
- angular.extend($alertProvider.defaults, {
+ Object.assign($alertProvider.defaults, {
container: 'body',
placement: 'top-right',
duration: '5',
@@ -99,7 +99,7 @@ igniteConsoleCfg.config(['$alertProvider', ($alertProvider) => {
// AngularStrap dropdowns () configuration.
igniteConsoleCfg.config(['$dropdownProvider', ($dropdownProvider) => {
- angular.extend($dropdownProvider.defaults, {
+ Object.assign($dropdownProvider.defaults, {
templateUrl: dropdownTemplateUrl,
animation: ''
});
@@ -107,7 +107,7 @@ igniteConsoleCfg.config(['$dropdownProvider', ($dropdownProvider) => {
// AngularStrap dropdowns () configuration.
igniteConsoleCfg.config(['$datepickerProvider', ($datepickerProvider) => {
- angular.extend($datepickerProvider.defaults, {
+ Object.assign($datepickerProvider.defaults, {
autoclose: true,
iconLeft: 'icon-datepicker-left',
iconRight: 'icon-datepicker-right'
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js b/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
index 7f852b0..813c998 100644
--- a/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
+++ b/modules/web-console/frontend/app/components/page-configure/components/modal-import-models/component.js
@@ -84,7 +84,7 @@ const DFLT_REPLICATED_CACHE = {
const CACHE_TEMPLATES = [DFLT_PARTITIONED_CACHE, DFLT_REPLICATED_CACHE];
export class ModalImportModels {
- /**
+ /**
* Cluster ID to import models into
* @type {string}
*/
@@ -771,7 +771,7 @@ export class ModalImportModels {
// Prepare caches for generation.
if (table.action === IMPORT_DM_NEW_CACHE) {
- const newCache = angular.copy(this.loadedCaches[table.cacheOrTemplate]);
+ const newCache = _.cloneDeep(this.loadedCaches[table.cacheOrTemplate]);
batchAction.newCache = newCache;
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/components/page-profile/controller.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/components/page-profile/controller.js b/modules/web-console/frontend/app/components/page-profile/controller.js
index 05fe118..c67a603 100644
--- a/modules/web-console/frontend/app/components/page-profile/controller.js
+++ b/modules/web-console/frontend/app/components/page-profile/controller.js
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+import _ from 'lodash';
+
export default class PageProfileController {
static $inject = [
'$rootScope', '$scope', '$http', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteFocus', 'IgniteConfirm', 'IgniteCountries', 'User'
@@ -28,7 +30,7 @@ export default class PageProfileController {
this.ui = {};
this.User.read()
- .then((user) => this.ui.user = angular.copy(user));
+ .then((user) => this.ui.user = _.cloneDeep(user));
this.ui.countries = this.Countries.getAll();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/modules/ace.module.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/ace.module.js b/modules/web-console/frontend/app/modules/ace.module.js
index a28536a..6a6e70a 100644
--- a/modules/web-console/frontend/app/modules/ace.module.js
+++ b/modules/web-console/frontend/app/modules/ace.module.js
@@ -16,12 +16,13 @@
*/
import angular from 'angular';
+import _ from 'lodash';
angular
.module('ignite-console.ace', [])
.constant('igniteAceConfig', {})
.directive('igniteAce', ['igniteAceConfig', (aceConfig) => {
- if (angular.isUndefined(window.ace))
+ if (_.isUndefined(window.ace))
throw new Error('ignite-ace need ace to work... (o rly?)');
/**
@@ -43,7 +44,7 @@ angular
*/
const setOptions = (acee, session, opts) => {
// Sets the ace worker path, if running from concatenated or minified source.
- if (angular.isDefined(opts.workerPath)) {
+ if (!_.isUndefined(opts.workerPath)) {
const config = window.ace.acequire('ace/config');
config.set('workerPath', opts.workerPath);
@@ -53,26 +54,26 @@ angular
_.forEach(opts.require, (n) => window.ace.acequire(n));
// Boolean options.
- if (angular.isDefined(opts.showGutter))
+ if (!_.isUndefined(opts.showGutter))
acee.renderer.setShowGutter(opts.showGutter);
- if (angular.isDefined(opts.useWrapMode))
+ if (!_.isUndefined(opts.useWrapMode))
session.setUseWrapMode(opts.useWrapMode);
- if (angular.isDefined(opts.showInvisibles))
+ if (!_.isUndefined(opts.showInvisibles))
acee.renderer.setShowInvisibles(opts.showInvisibles);
- if (angular.isDefined(opts.showIndentGuides))
+ if (!_.isUndefined(opts.showIndentGuides))
acee.renderer.setDisplayIndentGuides(opts.showIndentGuides);
- if (angular.isDefined(opts.useSoftTabs))
+ if (!_.isUndefined(opts.useSoftTabs))
session.setUseSoftTabs(opts.useSoftTabs);
- if (angular.isDefined(opts.showPrintMargin))
+ if (!_.isUndefined(opts.showPrintMargin))
acee.setShowPrintMargin(opts.showPrintMargin);
// Commands.
- if (angular.isDefined(opts.disableSearch) && opts.disableSearch) {
+ if (!_.isUndefined(opts.disableSearch) && opts.disableSearch) {
acee.commands.addCommands([{
name: 'unfind',
bindKey: {
@@ -85,21 +86,21 @@ angular
}
// Base options.
- if (angular.isString(opts.theme))
+ if (_.isString(opts.theme))
acee.setTheme('ace/theme/' + opts.theme);
- if (angular.isString(opts.mode))
+ if (_.isString(opts.mode))
session.setMode('ace/mode/' + opts.mode);
- if (angular.isDefined(opts.firstLineNumber)) {
- if (angular.isNumber(opts.firstLineNumber))
+ if (!_.isUndefined(opts.firstLineNumber)) {
+ if (_.isNumber(opts.firstLineNumber))
session.setOption('firstLineNumber', opts.firstLineNumber);
- else if (angular.isFunction(opts.firstLineNumber))
+ else if (_.isFunction(opts.firstLineNumber))
session.setOption('firstLineNumber', opts.firstLineNumber());
}
// Advanced options.
- if (angular.isDefined(opts.advanced)) {
+ if (!_.isUndefined(opts.advanced)) {
for (const key in opts.advanced) {
if (opts.advanced.hasOwnProperty(key)) {
// Create a javascript object with the key and value.
@@ -112,7 +113,7 @@ angular
}
// Advanced options for the renderer.
- if (angular.isDefined(opts.rendererOptions)) {
+ if (!_.isUndefined(opts.rendererOptions)) {
for (const key in opts.rendererOptions) {
if (opts.rendererOptions.hasOwnProperty(key)) {
// Create a javascript object with the key and value.
@@ -126,7 +127,7 @@ angular
// onLoad callbacks.
_.forEach(opts.callbacks, (cb) => {
- if (angular.isFunction(cb))
+ if (_.isFunction(cb))
cb(acee);
});
};
@@ -147,7 +148,7 @@ angular
*
* @type object
*/
- let opts = angular.extend({}, options, scope.$eval(attrs.igniteAce));
+ let opts = Object.assign({}, options, scope.$eval(attrs.igniteAce));
/**
* ACE editor.
@@ -191,9 +192,9 @@ angular
!scope.$$phase && !scope.$root.$$phase)
scope.$eval(() => ngModel.$setViewValue(newValue));
- if (angular.isDefined(callback)) {
+ if (!_.isUndefined(callback)) {
scope.$evalAsync(() => {
- if (angular.isFunction(callback))
+ if (_.isFunction(callback))
callback([e, acee]);
else
throw new Error('ignite-ace use a function as callback');
@@ -210,10 +211,10 @@ angular
form && form.$removeControl(ngModel);
ngModel.$formatters.push((value) => {
- if (angular.isUndefined(value) || value === null)
+ if (_.isUndefined(value) || value === null)
return '';
- if (angular.isObject(value) || angular.isArray(value))
+ if (_.isObject(value) || _.isArray(value))
throw new Error('ignite-ace cannot use an object or an array as a model');
return value;
@@ -229,7 +230,7 @@ angular
if (current === previous)
return;
- opts = angular.extend({}, options, scope.$eval(attrs.igniteAce));
+ opts = Object.assign({}, options, scope.$eval(attrs.igniteAce));
opts.callbacks = [opts.onLoad];
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js b/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
index 39f7ccd..32fa167 100644
--- a/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
+++ b/modules/web-console/frontend/app/services/AngularStrapSelect.decorator.js
@@ -16,6 +16,7 @@
*/
import angular from 'angular';
+import _ from 'lodash';
/**
* Special decorator that fix problem in AngularStrap selectAll / deselectAll methods.
@@ -27,12 +28,12 @@ export default angular.module('mgcrea.ngStrap.select')
const delegate = $delegate(element, controller, config);
// Common vars.
- const options = angular.extend({}, $delegate.defaults, config);
+ const options = Object.assign({}, $delegate.defaults, config);
const scope = delegate.$scope;
const valueByIndex = (index) => {
- if (angular.isUndefined(scope.$matches[index]))
+ if (_.isUndefined(scope.$matches[index]))
return null;
return scope.$matches[index].value;
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js b/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
index d01a450..fa59f32 100644
--- a/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
+++ b/modules/web-console/frontend/app/services/AngularStrapTooltip.decorator.js
@@ -16,7 +16,7 @@
*/
import angular from 'angular';
-import flow from 'lodash/flow';
+import _ from 'lodash';
/**
* Decorator that fix problem in AngularStrap $tooltip.
@@ -62,7 +62,7 @@ export default angular
scope.$emit(options.prefixEvent + '.hide.before', $tooltip);
- if (angular.isDefined(options.onBeforeHide) && angular.isFunction(options.onBeforeHide))
+ if (!_.isUndefined(options.onBeforeHide) && _.isFunction(options.onBeforeHide))
options.onBeforeHide($tooltip);
$tooltip.$isShown = scope.$isShown = false;
@@ -82,8 +82,8 @@ export default angular
const $tooltip = $delegate(el, config);
$tooltip.$referenceElement = el;
- $tooltip.destroy = flow($tooltip.destroy, () => $tooltip.$referenceElement = null);
- $tooltip.$applyPlacement = flow($tooltip.$applyPlacement, () => {
+ $tooltip.destroy = _.flow($tooltip.destroy, () => $tooltip.$referenceElement = null);
+ $tooltip.$applyPlacement = _.flow($tooltip.$applyPlacement, () => {
if (!$tooltip.$element)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/74d25456/modules/web-console/frontend/app/services/FormUtils.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/services/FormUtils.service.js b/modules/web-console/frontend/app/services/FormUtils.service.js
index f22d4bc..da1d737 100644
--- a/modules/web-console/frontend/app/services/FormUtils.service.js
+++ b/modules/web-console/frontend/app/services/FormUtils.service.js
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import _ from 'lodash';
export default ['IgniteFormUtils', ['$window', 'IgniteFocus', ($window, Focus) => {
function ensureActivePanel(ui, pnl, focusId) {
@@ -41,7 +42,7 @@ export default ['IgniteFormUtils', ['$window', 'IgniteFocus', ($window, Focus) =
if (!activePanels || activePanels.length < 1)
ui.activePanels = [idx];
else if (!_.includes(activePanels, idx)) {
- const newActivePanels = angular.copy(activePanels);
+ const newActivePanels = _.cloneDeep(activePanels);
newActivePanels.push(idx);
[05/10] ignite git commit: IGNITE-8216 Fixed javadoc for release build
Posted by ak...@apache.org.
IGNITE-8216 Fixed javadoc for release build
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6557fe62
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6557fe62
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6557fe62
Branch: refs/heads/ignite-8201
Commit: 6557fe62696ac24c740e445b53482da298b59b27
Parents: 780fc07
Author: Sergey Chugunov <se...@gmail.com>
Authored: Wed Apr 11 12:28:40 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 11 12:28:40 2018 +0300
----------------------------------------------------------------------
parent/pom.xml | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6557fe62/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 16a9395..3decc16 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -404,6 +404,10 @@
<packages>org.apache.ignite.spi.eventstorage*</packages>
</group>
<group>
+ <title>Communication Failure Detection</title>
+ <packages>org.apache.ignite.failure</packages>
+ </group>
+ <group>
<title>Segmentation Detection</title>
<packages>org.apache.ignite.plugin.segmentation</packages>
</group>
[03/10] ignite git commit: IGNITE-7871 Implemented additional
synchronization phase for correct partition counters update
Posted by ak...@apache.org.
IGNITE-7871 Implemented additional synchronization phase for correct partition counters update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da77b981
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da77b981
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da77b981
Branch: refs/heads/ignite-8201
Commit: da77b9818a70495b7afdf6899ebd9180dadd7f68
Parents: f4de6df
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 11 11:23:46 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 11 11:23:46 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../communication/GridIoMessageFactory.java | 6 +
.../discovery/GridDiscoveryManager.java | 10 +
.../MetaPageUpdatePartitionDataRecord.java | 2 +-
.../processors/cache/CacheMetricsImpl.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 38 +
.../GridCachePartitionExchangeManager.java | 17 +
.../cache/GridCacheSharedContext.java | 9 +-
.../processors/cache/GridCacheUtils.java | 2 +-
.../cache/IgniteCacheOffheapManager.java | 8 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 10 +-
.../dht/GridClientPartitionTopology.java | 5 +
.../distributed/dht/GridDhtLocalPartition.java | 9 +-
.../dht/GridDhtPartitionTopology.java | 6 +
.../dht/GridDhtPartitionTopologyImpl.java | 26 +-
.../dht/GridDhtPartitionsStateValidator.java | 255 +++++++
.../cache/distributed/dht/GridDhtTxLocal.java | 5 +
.../GridDhtPartitionsExchangeFuture.java | 96 ++-
.../GridDhtPartitionsSingleMessage.java | 68 +-
.../dht/preloader/InitNewCoordinatorFuture.java | 2 +-
.../preloader/latch/ExchangeLatchManager.java | 695 +++++++++++++++++++
.../distributed/dht/preloader/latch/Latch.java | 52 ++
.../dht/preloader/latch/LatchAckMessage.java | 165 +++++
.../cache/distributed/near/GridNearTxLocal.java | 10 +
.../persistence/GridCacheOffheapManager.java | 10 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 36 +-
...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +-
.../processors/cache/IgniteCacheGroupsTest.java | 1 +
...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++
.../GridCachePartitionsStateValidationTest.java | 316 +++++++++
...idCachePartitionsStateValidatorSelfTest.java | 158 +++++
.../TxOptimisticOnPartitionExchangeTest.java | 322 +++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 4 +
.../testsuites/IgniteCacheTestSuite6.java | 6 +
35 files changed, 2568 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 1227e8c..0b2d41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -124,7 +124,10 @@ public enum GridTopic {
TOPIC_METRICS,
/** */
- TOPIC_AUTH;
+ TOPIC_AUTH,
+
+ /** */
+ TOPIC_EXCHANGE;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5616fd0..581c32e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -921,6 +922,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 135:
+ msg = new LatchAckMessage();
+
+ break;
+
// [-3..119] [124..129] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a1d84e5..400bb5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -793,6 +793,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onDisconnected();
+ if (!locJoin.isDone())
+ locJoin.onDone(new IgniteCheckedException("Node disconnected"));
+
locJoin = new GridFutureAdapter<>();
registeredCaches.clear();
@@ -2142,6 +2145,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @return Local join future.
+ */
+ public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() {
+ return locJoin;
+ }
+
+ /**
* @param msg Custom message.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index bafbf47..e5bd343 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -32,7 +32,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
/** */
private long globalRmvId;
- /** */
+ /** TODO: Partition size may be long */
private int partSize;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6fae8fe..b402ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -792,7 +792,7 @@ public class CacheMetricsImpl implements CacheMetrics {
if (cctx.cache() == null)
continue;
- int cacheSize = part.dataStore().cacheSize(cctx.cacheId());
+ long cacheSize = part.dataStore().cacheSize(cctx.cacheId());
offHeapEntriesCnt += cacheSize;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a9fa3c7..fade833 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -44,6 +44,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -314,6 +316,42 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Creates a future that will wait for finishing all remote transactions (primary -> backup)
+ * with topology version less or equal to {@code topVer}.
+ *
+ * @param topVer Topology version.
+ * @return Compound future of all {@link GridDhtTxFinishFuture} futures.
+ */
+ public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) {
+ GridCompoundFuture<?, ?> res = new CacheObjectsReleaseFuture<>("RemoteTx", topVer);
+
+ for (GridCacheFuture<?> fut : futs.values()) {
+ if (fut instanceof GridDhtTxFinishFuture) {
+ GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture) fut;
+
+ if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer))
+ res.add(ignoreErrors(finishTxFuture));
+ }
+ }
+
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Future wrapper which ignores any underlying future errors.
+ *
+ * @param f Underlying future.
+ * @return Future wrapper which ignore any underlying future errors.
+ */
+ private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
+ GridFutureAdapter<?> wrapper = new GridFutureAdapter();
+ f.listen(future -> wrapper.onDone());
+ return wrapper;
+ }
+
+ /**
* @param leftNodeId Left node ID.
* @param topVer Topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1a0e65f..20a3ccb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -216,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** For tests only. */
private volatile AffinityTopologyVersion exchMergeTestWaitVer;
+ /** Distributed latch manager. */
+ private ExchangeLatchManager latchMgr;
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -309,6 +313,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker = new ExchangeWorker();
+ latchMgr = new ExchangeLatchManager(cctx.kernalContext());
+
cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
@@ -1255,6 +1261,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
m.addPartitionUpdateCounters(grp.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+ m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes());
}
}
}
@@ -1277,6 +1285,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
m.addPartitionUpdateCounters(top.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+
+ m.addPartitionSizes(top.groupId(), top.partitionSizes());
}
}
@@ -1570,6 +1580,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @return Latch manager instance.
+ */
+ public ExchangeLatchManager latch() {
+ return latchMgr;
+ }
+
+ /**
* @param exchFut Optional current exchange future.
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c2f9229..b3b4f0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.function.BiFunction;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
@@ -711,7 +710,7 @@ public class GridCacheSharedContext<K, V> {
/**
* @return Ttl cleanup manager.
- * */
+ */
public GridCacheSharedTtlCleanupManager ttl() {
return ttlMgr;
}
@@ -854,10 +853,14 @@ public class GridCacheSharedContext<K, V> {
GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer);
f.add(mvcc().finishExplicitLocks(topVer));
- f.add(tm().finishTxs(topVer));
f.add(mvcc().finishAtomicUpdates(topVer));
f.add(mvcc().finishDataStreamerUpdates(topVer));
+ IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer);
+ // To properly track progress of finishing local tx updates we explicitly add this future to compound set.
+ f.add(finishLocalTxsFuture);
+ f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
+
f.markInitialized();
return f;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a5169d2..d672420 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1732,7 +1732,7 @@ public class GridCacheUtils {
ver,
expiryPlc == null ? 0 : expiryPlc.forCreate(),
expiryPlc == null ? 0 : toExpireTime(expiryPlc.forCreate()),
- false,
+ true,
topVer,
GridDrType.DR_BACKUP,
true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 3d83f87..a12c033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -22,11 +22,11 @@ import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridAtomicLong;
@@ -344,7 +344,7 @@ public interface IgniteCacheOffheapManager {
* @param part Partition.
* @return Number of entries.
*/
- public int totalPartitionEntriesCount(int part);
+ public long totalPartitionEntriesCount(int part);
/**
*
@@ -381,7 +381,7 @@ public interface IgniteCacheOffheapManager {
* @param cacheId Cache ID.
* @return Size.
*/
- int cacheSize(int cacheId);
+ long cacheSize(int cacheId);
/**
* @return Cache sizes if store belongs to group containing multiple caches.
@@ -391,7 +391,7 @@ public interface IgniteCacheOffheapManager {
/**
* @return Total size.
*/
- int fullSize();
+ long fullSize();
/**
* @return Update counter.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index b201935..f8cc86f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -252,7 +252,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int totalPartitionEntriesCount(int p) {
+ @Override public long totalPartitionEntriesCount(int p) {
if (grp.isLocal())
return locCacheDataStore.fullSize();
else {
@@ -1152,14 +1152,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int cacheSize(int cacheId) {
+ @Override public long cacheSize(int cacheId) {
if (grp.sharedGroup()) {
AtomicLong size = cacheSizes.get(cacheId);
return size != null ? (int)size.get() : 0;
}
- return (int)storageSize.get();
+ return storageSize.get();
}
/** {@inheritDoc} */
@@ -1176,8 +1176,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public int fullSize() {
- return (int)storageSize.get();
+ @Override public long fullSize() {
+ return storageSize.get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5bbbb31..3e3bb0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1196,6 +1196,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> partitionSizes() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
assert false : "Should not be called on non-affinity node";
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 7a47f31..ea20dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -929,7 +929,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
/**
* @return Initial update counter.
*/
- public Long initialUpdateCounter() {
+ public long initialUpdateCounter() {
return store.initialUpdateCounter();
}
@@ -948,6 +948,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * @return Total size of all caches.
+ */
+ public long fullSize() {
+ return store.fullSize();
+ }
+
+ /**
* Removes all entries and rows from this partition.
*
* @return Number of rows cleared from page memory.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7f900cb..6f68dbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -345,6 +346,11 @@ public interface GridDhtPartitionTopology {
public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
/**
+ * @return Partition cache sizes.
+ */
+ public Map<Integer, Long> partitionSizes();
+
+ /**
* @param part Partition to own.
* @return {@code True} if owned.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 538c57e..740903e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -2526,6 +2528,28 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> partitionSizes() {
+ lock.readLock().lock();
+
+ try {
+ Map<Integer, Long> partitionSizes = new HashMap<>();
+
+ for (int p = 0; p < locParts.length(); p++) {
+ GridDhtLocalPartition part = locParts.get(p);
+ if (part == null || part.fullSize() == 0)
+ continue;
+
+ partitionSizes.put(part.id(), part.fullSize());
+ }
+
+ return partitionSizes;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
AffinityTopologyVersion curTopVer = this.readyTopVer;
@@ -2587,7 +2611,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- int size = part.dataStore().fullSize();
+ long size = part.dataStore().fullSize();
if (size >= threshold)
X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
new file mode 100644
index 0000000..92a0584
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ * Class to validate partitions update counters and cache sizes during exchange process.
+ */
+public class GridDhtPartitionsStateValidator {
+ /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */
+ private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+ /** Cache shared context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache shared context.
+ */
+ public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) {
+ this.cctx = cctx;
+ }
+
+ /**
+ * Validates partition states - update counters and cache sizes for all nodes.
+ * If update counter value or cache size for the same partitions are different on some nodes
+ * method throws exception with full information about inconsistent partitions.
+ *
+ * @param fut Current exchange future.
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @throws IgniteCheckedException If validation failed. Exception message contains
+ * full information about all partitions which update counters or cache sizes are not consistent.
+ */
+ public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut,
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException {
+ // Ignore just joined nodes.
+ final Set<UUID> ignoringNodes = new HashSet<>();
+
+ for (DiscoveryEvent evt : fut.events().events())
+ if (evt.type() == EVT_NODE_JOINED)
+ ignoringNodes.add(evt.eventNode().id());
+
+ AffinityTopologyVersion topVer = fut.context().events().topologyVersion();
+
+ // Validate update counters.
+ Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes);
+ if (!result.isEmpty())
+ throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result));
+
+ // For sizes validation ignore also nodes which are not able to send cache sizes.
+ for (UUID id : messages.keySet()) {
+ ClusterNode node = cctx.discovery().node(id);
+ if (node != null && node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0)
+ ignoringNodes.add(id);
+ }
+
+ // Validate cache sizes.
+ result = validatePartitionsSizes(top, messages, ignoringNodes);
+ if (!result.isEmpty())
+ throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
+ }
+
+ /**
+ * Validate partitions update counters for given {@code top}.
+ *
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @param ignoringNodes Nodes for what we ignore validation.
+ * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)).
+ * If map is empty validation is successful.
+ */
+ Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes) {
+ Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+ Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>();
+
+ // Populate counters statistics from local node partitions.
+ for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+ if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+ continue;
+
+ updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter()));
+ }
+
+ int partitions = top.partitions();
+
+ // Then process and validate counters from other nodes.
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+ UUID nodeId = e.getKey();
+ if (ignoringNodes.contains(nodeId))
+ continue;
+
+ CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions);
+
+ for (int part = 0; part < partitions; part++) {
+ if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+ continue;
+
+ int partIdx = countersMap.partitionIndex(part);
+ long currentCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0;
+
+ process(invalidPartitions, updateCountersAndNodesByPartitions, part, nodeId, currentCounter);
+ }
+ }
+
+ return invalidPartitions;
+ }
+
+ /**
+ * Validate partitions cache sizes for given {@code top}.
+ *
+ * @param top Topology to validate.
+ * @param messages Single messages received from all nodes.
+ * @param ignoringNodes Nodes for what we ignore validation.
+ * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)).
+ * If map is empty validation is successful.
+ */
+ Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
+ GridDhtPartitionTopology top,
+ Map<UUID, GridDhtPartitionsSingleMessage> messages,
+ Set<UUID> ignoringNodes) {
+ Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
+
+ Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>();
+
+ // Populate sizes statistics from local node partitions.
+ for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
+ if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING)
+ continue;
+
+ sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize()));
+ }
+
+ int partitions = top.partitions();
+
+ // Then process and validate sizes from other nodes.
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) {
+ UUID nodeId = e.getKey();
+ if (ignoringNodes.contains(nodeId))
+ continue;
+
+ Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId());
+
+ for (int part = 0; part < partitions; part++) {
+ if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING)
+ continue;
+
+ long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L;
+
+ process(invalidPartitions, sizesAndNodesByPartitions, part, nodeId, currentSize);
+ }
+ }
+
+ return invalidPartitions;
+ }
+
+ /**
+ * Processes given {@code counter} for partition {@code part} reported by {@code node}.
+ * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different.
+ *
+ * @param invalidPartitions Invalid partitions map.
+ * @param countersAndNodes Current map of counters and nodes by partitions.
+ * @param part Processing partition.
+ * @param node Node id.
+ * @param counter Counter value reported by {@code node}.
+ */
+ private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
+ Map<Integer, T2<UUID, Long>> countersAndNodes,
+ int part,
+ UUID node,
+ long counter) {
+ T2<UUID, Long> existingData = countersAndNodes.get(part);
+
+ if (existingData == null)
+ countersAndNodes.put(part, new T2<>(node, counter));
+
+ if (existingData != null && counter != existingData.get2()) {
+ if (!invalidPartitions.containsKey(part)) {
+ Map<UUID, Long> map = new HashMap<>();
+ map.put(existingData.get1(), existingData.get2());
+ invalidPartitions.put(part, map);
+ }
+
+ invalidPartitions.get(part).put(node, counter);
+ }
+ }
+
+ /**
+ * Folds given map of invalid partition states to string representation in the following format:
+ * Part [id]: [consistentId=value*]
+ *
+ * Value can be both update counter or cache size.
+ *
+ * @param topVer Last topology version.
+ * @param invalidPartitions Invalid partitions map.
+ * @return String representation of invalid partitions.
+ */
+ private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, Long>> invalidPartitions) {
+ SB sb = new SB();
+
+ NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new TreeMap<>(invalidPartitions);
+
+ for (Map.Entry<Integer, Map<UUID, Long>> p : sortedPartitions.entrySet()) {
+ sb.a("Part ").a(p.getKey()).a(": [");
+ for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) {
+ Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId();
+ sb.a(consistentId).a("=").a(e.getValue()).a(" ");
+ }
+ sb.a("] ");
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 28cc018..0609f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -447,6 +447,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
err = e;
}
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ throw t;
+ }
if (primarySync)
sendFinishReply(err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbb4985..dd4a571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -75,10 +76,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -290,6 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private GridDhtPartitionsExchangeFuture mergedWith;
+ /** Validator for partition states. */
+ @GridToStringExclude
+ private final GridDhtPartitionsStateValidator validator;
+
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
@@ -314,6 +321,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
this.exchId = exchId;
this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
+ this.validator = new GridDhtPartitionsStateValidator(cctx);
log = cctx.logger(getClass());
exchLog = cctx.logger(EXCHANGE_LOG);
@@ -1099,7 +1107,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
// To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
partHistReserved = cctx.database().reserveHistoryForExchange();
- waitPartitionRelease();
+ // On first phase we wait for finishing all local tx updates, atomic updates and lock releases.
+ waitPartitionRelease(1);
+
+ // Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
+ waitPartitionRelease(2);
boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
@@ -1202,9 +1214,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* For the exact list of the objects being awaited for see
* {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
*
+ * @param phase Phase of partition release.
+ *
* @throws IgniteCheckedException If failed.
*/
- private void waitPartitionRelease() throws IgniteCheckedException {
+ private void waitPartitionRelease(int phase) throws IgniteCheckedException {
+ Latch releaseLatch = null;
+
+ // Wait for other nodes only on first phase.
+ if (phase == 1)
+ releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion());
+
IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
// Assign to class variable so it will be included into toString() method.
@@ -1238,6 +1258,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
}
}
+ catch (IgniteCheckedException e) {
+ U.warn(log,"Unable to await partitions release future", e);
+
+ throw e;
+ }
}
long waitEnd = U.currentTimeMillis();
@@ -1290,6 +1315,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
}
+
+ if (releaseLatch == null)
+ return;
+
+ releaseLatch.countDown();
+
+ if (!localJoinExchange()) {
+ try {
+ while (true) {
+ try {
+ releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS);
+
+ if (log.isInfoEnabled())
+ log.info("Finished waiting for partitions release latch: " + releaseLatch);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Unable to await partitions release latch within timeout: " + releaseLatch);
+
+ // Try to resend ack.
+ releaseLatch.countDown();
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage());
+ }
+ }
}
/**
@@ -2499,6 +2553,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ validatePartitionsState();
+
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
@@ -2683,6 +2739,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Validates that partition update counters and cache sizes for all caches are consistent.
+ */
+ private void validatePartitionsState() {
+ for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) {
+ CacheGroupDescriptor grpDesc = e.getValue();
+ if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
+ continue;
+
+ int grpId = e.getKey();
+
+ CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpId);
+
+ GridDhtPartitionTopology top = grpCtx != null ?
+ grpCtx.topology() :
+ cctx.exchange().clientTopology(grpId, events().discoveryCache());
+
+ // Do not validate read or write through caches or caches with disabled rebalance.
+ if (grpCtx == null
+ || grpCtx.config().isReadThrough()
+ || grpCtx.config().isWriteThrough()
+ || grpCtx.config().getCacheStoreFactory() != null
+ || grpCtx.config().getRebalanceDelay() != -1
+ || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE)
+ continue;
+
+ try {
+ validator.validatePartitionCountersAndSizes(this, top, msgs);
+ }
+ catch (IgniteCheckedException ex) {
+ log.warning("Partition states validation was failed for cache " + grpDesc.cacheOrGroupName(), ex);
+ // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
+ }
+ }
+ }
+
+ /**
*
*/
private void assignPartitionsStates() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 215152d..6ebafac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.util.Collection;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -67,6 +67,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions counters. */
private byte[] partCntrsBytes;
+ /** Partitions sizes. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<Integer, Map<Integer, Long>> partSizes;
+
+ /** Serialized partitions counters. */
+ private byte[] partSizesBytes;
+
/** Partitions history reservation counters. */
@GridToStringInclude
@GridDirectTransient
@@ -220,6 +228,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
}
/**
+ * Adds partition sizes map for specified {@code grpId} to the current message.
+ *
+ * @param grpId Group id.
+ * @param partSizesMap Partition sizes map.
+ */
+ public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
+ if (partSizesMap.isEmpty())
+ return;
+
+ if (partSizes == null)
+ partSizes = new HashMap<>();
+
+ partSizes.put(grpId, partSizesMap);
+ }
+
+ /**
+ * Returns partition sizes map for specified {@code grpId}.
+ *
+ * @param grpId Group id.
+ * @return Partition sizes map (partId, partSize).
+ */
+ public Map<Integer, Long> partitionSizes(int grpId) {
+ if (partSizes == null)
+ return Collections.emptyMap();
+
+ return partSizes.getOrDefault(grpId, Collections.emptyMap());
+ }
+
+ /**
* @param grpId Cache group ID.
* @param cntrMap Partition history counters.
*/
@@ -287,12 +324,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
boolean marshal = (parts != null && partsBytes == null) ||
(partCntrs != null && partCntrsBytes == null) ||
(partHistCntrs != null && partHistCntrsBytes == null) ||
+ (partSizes != null && partSizesBytes == null) ||
(err != null && errBytes == null);
if (marshal) {
byte[] partsBytes0 = null;
byte[] partCntrsBytes0 = null;
byte[] partHistCntrsBytes0 = null;
+ byte[] partSizesBytes0 = null;
byte[] errBytes0 = null;
if (parts != null && partsBytes == null)
@@ -304,6 +343,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partHistCntrs != null && partHistCntrsBytes == null)
partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
+ if (partSizes != null && partSizesBytes == null)
+ partSizesBytes0 = U.marshal(ctx, partSizes);
+
if (err != null && errBytes == null)
errBytes0 = U.marshal(ctx, err);
@@ -314,11 +356,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
+ byte[] partSizesBytesZip = U.zip(partSizesBytes0);
byte[] exBytesZip = U.zip(errBytes0);
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
partHistCntrsBytes0 = partHistCntrsBytesZip;
+ partSizesBytes0 = partSizesBytesZip;
errBytes0 = exBytesZip;
compressed(true);
@@ -331,6 +375,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partsBytes = partsBytes0;
partCntrsBytes = partCntrsBytes0;
partHistCntrsBytes = partHistCntrsBytes0;
+ partSizesBytes = partSizesBytes0;
errBytes = errBytes0;
}
}
@@ -360,6 +405,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
+ if (partSizesBytes != null && partSizes == null) {
+ if (compressed())
+ partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ partSizes = U.unmarshal(ctx, partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
if (errBytes != null && err == null) {
if (compressed())
err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -451,6 +503,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
+ case 13:
+ if (!writer.writeByteArray("partsSizesBytes", partSizesBytes))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -531,6 +588,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
+ case 13:
+ partSizesBytes = reader.readByteArray("partsSizesBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -543,7 +607,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 596fa8c..42a9ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -235,7 +235,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (awaited.remove(node.id())) {
GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
- if (fullMsg0 != null) {
+ if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != null) {
assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
fullMsg = fullMsg0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
new file mode 100644
index 0000000..c205cb1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Class is responsible to create and manage instances of distributed latches {@link Latch}.
+ */
+public class ExchangeLatchManager {
+ /** Version since latch management is available. */
+ private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0");
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** Discovery manager. */
+ private final GridDiscoveryManager discovery;
+
+ /** IO manager. */
+ private final GridIoManager io;
+
+ /** Current coordinator. */
+ private volatile ClusterNode coordinator;
+
+ /** Pending acks collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+
+ /** Server latches collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>();
+
+ /** Client latches collection. */
+ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>();
+
+ /** Lock. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public ExchangeLatchManager(GridKernalContext ctx) {
+ this.ctx = ctx;
+ this.log = ctx.log(getClass());
+ this.discovery = ctx.discovery();
+ this.io = ctx.io();
+
+ if (!ctx.clientNode()) {
+ ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
+ if (msg instanceof LatchAckMessage) {
+ processAck(nodeId, (LatchAckMessage) msg);
+ }
+ });
+
+ // First coordinator initialization.
+ ctx.discovery().localJoinFuture().listen(f -> {
+ this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+ });
+
+ ctx.event().addDiscoveryEventListener((e, cache) -> {
+ assert e != null;
+ assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
+
+ // Do not process from discovery thread.
+ ctx.closure().runLocalSafe(() -> processNodeLeft(e.eventNode()));
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+ }
+
+ /**
+ * Creates server latch with given {@code id} and {@code topVer}.
+ * Adds corresponding pending acks to it.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ * @return Server latch instance.
+ */
+ private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+ if (serverLatches.containsKey(latchId))
+ return serverLatches.get(latchId);
+
+ ServerLatch latch = new ServerLatch(id, topVer, participants);
+
+ serverLatches.put(latchId, latch);
+
+ if (log.isDebugEnabled())
+ log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]");
+
+ if (pendingAcks.containsKey(latchId)) {
+ Set<UUID> acks = pendingAcks.get(latchId);
+
+ for (UUID node : acks)
+ if (latch.hasParticipant(node) && !latch.hasAck(node))
+ latch.ack(node);
+
+ pendingAcks.remove(latchId);
+ }
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchId);
+
+ return latch;
+ }
+
+ /**
+ * Creates client latch.
+ * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param coordinator Coordinator node.
+ * @param participants Participant nodes.
+ * @return Client latch instance.
+ */
+ private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+ final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer);
+
+ if (clientLatches.containsKey(latchId))
+ return clientLatches.get(latchId);
+
+ ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants);
+
+ if (log.isDebugEnabled())
+ log.debug("Client latch is created [latch=" + latchId
+ + ", crd=" + coordinator
+ + ", participantsSize=" + participants.size() + "]");
+
+ // There is final ack for created latch.
+ if (pendingAcks.containsKey(latchId)) {
+ latch.complete();
+ pendingAcks.remove(latchId);
+ }
+ else
+ clientLatches.put(latchId, latch);
+
+ return latch;
+ }
+
+ /**
+ * Creates new latch with specified {@code id} and {@code topVer} or returns existing latch.
+ *
+ * Participants of latch are calculated from given {@code topVer} as alive server nodes.
+ * If local node is coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @return Latch instance.
+ */
+ public Latch getOrCreate(String id, AffinityTopologyVersion topVer) {
+ lock.lock();
+
+ try {
+ ClusterNode coordinator = getLatchCoordinator(topVer);
+
+ if (coordinator == null) {
+ ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList());
+ latch.complete();
+
+ return latch;
+ }
+
+ Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+ return coordinator.isLocal()
+ ? createServerLatch(id, topVer, participants)
+ : createClientLatch(id, topVer, coordinator, participants);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param topVer Latch topology version.
+ * @return Collection of alive server nodes with latch functionality.
+ */
+ private Collection<ClusterNode> getLatchParticipants(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+ ? discovery.aliveServerNodes()
+ : discovery.discoCache(topVer).aliveServerNodes();
+
+ return aliveNodes
+ .stream()
+ .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param topVer Latch topology version.
+ * @return Oldest alive server node with latch functionality.
+ */
+ @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE
+ ? discovery.aliveServerNodes()
+ : discovery.discoCache(topVer).aliveServerNodes();
+
+ return aliveNodes
+ .stream()
+ .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+ .findFirst()
+ .orElse(null);
+ }
+
+ /**
+ * Processes ack message from given {@code from} node.
+ *
+ * Completes client latch in case of final ack message.
+ *
+ * If no latch is associated with message, ack is placed to {@link #pendingAcks} set.
+ *
+ * @param from Node sent ack.
+ * @param message Ack message.
+ */
+ private void processAck(UUID from, LatchAckMessage message) {
+ lock.lock();
+
+ try {
+ ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+ if (coordinator == null)
+ return;
+
+ T2<String, AffinityTopologyVersion> latchId = new T2<>(message.latchId(), message.topVer());
+
+ if (message.isFinal()) {
+ if (log.isDebugEnabled())
+ log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]");
+
+ if (clientLatches.containsKey(latchId)) {
+ ClientLatch latch = clientLatches.remove(latchId);
+ latch.complete();
+ }
+ else if (!coordinator.isLocal()) {
+ pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+ pendingAcks.get(latchId).add(from);
+ }
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
+
+ if (serverLatches.containsKey(latchId)) {
+ ServerLatch latch = serverLatches.get(latchId);
+
+ if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+ latch.ack(from);
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchId);
+ }
+ }
+ else {
+ pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>());
+ pendingAcks.get(latchId).add(from);
+ }
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Changes coordinator to current local node.
+ * Restores all server latches from pending acks and own client latches.
+ */
+ private void becomeNewCoordinator() {
+ if (log.isInfoEnabled())
+ log.info("Become new coordinator " + coordinator.id());
+
+ List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>();
+ latchesToRestore.addAll(pendingAcks.keySet());
+ latchesToRestore.addAll(clientLatches.keySet());
+
+ for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
+ String id = latchId.get1();
+ AffinityTopologyVersion topVer = latchId.get2();
+ Collection<ClusterNode> participants = getLatchParticipants(topVer);
+
+ if (!participants.isEmpty())
+ createServerLatch(id, topVer, participants);
+ }
+ }
+
+ /**
+ * Handles node left discovery event.
+ *
+ * Summary:
+ * Removes pending acks corresponds to the left node.
+ * Adds fake acknowledgements to server latches where such node was participant.
+ * Changes client latches coordinator to oldest available server node where such node was coordinator.
+ * Detects coordinator change.
+ *
+ * @param left Left node.
+ */
+ private void processNodeLeft(ClusterNode left) {
+ assert this.coordinator != null : "Coordinator is not initialized";
+
+ lock.lock();
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Process node left " + left.id());
+
+ ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE);
+
+ if (coordinator == null)
+ return;
+
+ // Clear pending acks.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> ackEntry : pendingAcks.entrySet())
+ if (ackEntry.getValue().contains(left.id()))
+ pendingAcks.get(ackEntry.getKey()).remove(left.id());
+
+ // Change coordinator for client latches.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> latchEntry : clientLatches.entrySet()) {
+ ClientLatch latch = latchEntry.getValue();
+ if (latch.hasCoordinator(left.id())) {
+ // Change coordinator for latch and re-send ack if necessary.
+ if (latch.hasParticipant(coordinator.id()))
+ latch.newCoordinator(coordinator);
+ else {
+ /* If new coordinator is not able to take control on the latch,
+ it means that all other latch participants are left from topology
+ and there is no reason to track such latch. */
+ AffinityTopologyVersion topVer = latchEntry.getKey().get2();
+
+ assert getLatchParticipants(topVer).isEmpty();
+
+ latch.complete(new IgniteCheckedException("All latch participants are left from topology."));
+ clientLatches.remove(latchEntry.getKey());
+ }
+ }
+ }
+
+ // Add acknowledgements from left node.
+ for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> latchEntry : serverLatches.entrySet()) {
+ ServerLatch latch = latchEntry.getValue();
+
+ if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) {
+ if (log.isDebugEnabled())
+ log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]");
+
+ latch.ack(left.id());
+
+ if (latch.isCompleted())
+ serverLatches.remove(latchEntry.getKey());
+ }
+ }
+
+ // Coordinator is changed.
+ if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) {
+ this.coordinator = coordinator;
+
+ becomeNewCoordinator();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Latch creating on coordinator node.
+ * Latch collects acks from participants: non-coordinator nodes and current local node.
+ * Latch completes when all acks from all participants are received.
+ *
+ * After latch completion final ack is sent to all participants.
+ */
+ class ServerLatch extends CompletableLatch {
+ /** Number of latch permits. This is needed to track number of countDown invocations. */
+ private final AtomicInteger permits;
+
+ /** Set of received acks. */
+ private final Set<UUID> acks = new GridConcurrentHashSet<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ */
+ ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ super(id, topVer, participants);
+ this.permits = new AtomicInteger(participants.size());
+
+ // Send final acks when latch is completed.
+ this.complete.listen(f -> {
+ for (ClusterNode node : participants) {
+ try {
+ if (discovery.alive(node)) {
+ io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, true), GridIoPolicy.SYSTEM_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Final ack is ackSent [latch=" + latchId() + ", to=" + node.id() + "]");
+ }
+ } catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Unable to send final ack [latch=" + latchId() + ", to=" + node.id() + "]");
+ }
+ }
+ });
+ }
+
+ /**
+ * Checks if latch has ack from given node.
+ *
+ * @param from Node.
+ * @return {@code true} if latch has ack from given node.
+ */
+ private boolean hasAck(UUID from) {
+ return acks.contains(from);
+ }
+
+ /**
+ * Receives ack from given node.
+ * Count downs latch if ack was not already processed.
+ *
+ * @param from Node.
+ */
+ private void ack(UUID from) {
+ if (log.isDebugEnabled())
+ log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + "]");
+
+ countDown0(from);
+ }
+
+ /**
+ * Count down latch from ack of given node.
+ * Completes latch if all acks are received.
+ *
+ * @param node Node.
+ */
+ private void countDown0(UUID node) {
+ if (isCompleted() || acks.contains(node))
+ return;
+
+ acks.add(node);
+
+ int remaining = permits.decrementAndGet();
+
+ if (log.isDebugEnabled())
+ log.debug("Count down + [latch=" + latchId() + ", remaining=" + remaining + "]");
+
+ if (remaining == 0)
+ complete();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void countDown() {
+ countDown0(ctx.localNodeId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ Set<UUID> pendingAcks = participants.stream().filter(ack -> !acks.contains(ack)).collect(Collectors.toSet());
+
+ return S.toString(ServerLatch.class, this,
+ "pendingAcks", pendingAcks,
+ "super", super.toString());
+ }
+ }
+
+ /**
+ * Latch creating on non-coordinator node.
+ * Latch completes when final ack from coordinator is received.
+ */
+ class ClientLatch extends CompletableLatch {
+ /** Latch coordinator node. Can be changed if coordinator is left from topology. */
+ private volatile ClusterNode coordinator;
+
+ /** Flag indicates that ack is sent to coordinator. */
+ private boolean ackSent;
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param coordinator Coordinator node.
+ * @param participants Participant nodes.
+ */
+ ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
+ super(id, topVer, participants);
+
+ this.coordinator = coordinator;
+ }
+
+ /**
+ * Checks if latch coordinator is given {@code node}.
+ *
+ * @param node Node.
+ * @return {@code true} if latch coordinator is given node.
+ */
+ private boolean hasCoordinator(UUID node) {
+ return coordinator.id().equals(node);
+ }
+
+ /**
+ * Changes coordinator of latch and resends ack to new coordinator if needed.
+ *
+ * @param coordinator New coordinator.
+ */
+ private void newCoordinator(ClusterNode coordinator) {
+ if (log.isDebugEnabled())
+ log.debug("Coordinator is changed [latch=" + latchId() + ", crd=" + coordinator.id() + "]");
+
+ synchronized (this) {
+ this.coordinator = coordinator;
+
+ // Resend ack to new coordinator.
+ if (ackSent)
+ sendAck();
+ }
+ }
+
+ /**
+ * Sends ack to coordinator node.
+ * There is ack deduplication on coordinator. So it's fine to send same ack twice.
+ */
+ private void sendAck() {
+ try {
+ ackSent = true;
+
+ io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, false), GridIoPolicy.SYSTEM_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" + coordinator.id() + "]");
+ } catch (IgniteCheckedException e) {
+ // Coordinator is unreachable. On coodinator node left discovery event ack will be resent.
+ if (log.isDebugEnabled())
+ log.debug("Unable to send ack [latch=" + latchId() + ", to=" + coordinator.id() + "]: " + e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void countDown() {
+ if (isCompleted())
+ return;
+
+ // Synchronize in case of changed coordinator.
+ synchronized (this) {
+ sendAck();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientLatch.class, this,
+ "super", super.toString());
+ }
+ }
+
+ /**
+ * Base latch functionality with implemented complete / await logic.
+ */
+ private abstract static class CompletableLatch implements Latch {
+ /** Latch id. */
+ @GridToStringInclude
+ protected final String id;
+
+ /** Latch topology version. */
+ @GridToStringInclude
+ protected final AffinityTopologyVersion topVer;
+
+ /** Latch node participants. Only participant nodes are able to change state of latch. */
+ @GridToStringExclude
+ protected final Set<UUID> participants;
+
+ /** Future indicates that latch is completed. */
+ @GridToStringExclude
+ protected final GridFutureAdapter<?> complete = new GridFutureAdapter<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Latch id.
+ * @param topVer Latch topology version.
+ * @param participants Participant nodes.
+ */
+ CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
+ this.id = id;
+ this.topVer = topVer;
+ this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void await() throws IgniteCheckedException {
+ complete.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException {
+ complete.get(timeout, timeUnit);
+ }
+
+ /**
+ * Checks if latch participants contain given {@code node}.
+ *
+ * @param node Node.
+ * @return {@code true} if latch participants contain given node.
+ */
+ boolean hasParticipant(UUID node) {
+ return participants.contains(node);
+ }
+
+ /**
+ * @return {@code true} if latch is completed.
+ */
+ boolean isCompleted() {
+ return complete.isDone();
+ }
+
+ /**
+ * Completes current latch.
+ */
+ void complete() {
+ complete.onDone();
+ }
+
+ /**
+ * Completes current latch with given {@code error}.
+ *
+ * @param error Error.
+ */
+ void complete(Throwable error) {
+ complete.onDone(error);
+ }
+
+ /**
+ * @return Full latch id.
+ */
+ String latchId() {
+ return id + "-" + topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompletableLatch.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
new file mode 100644
index 0000000..9704c2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Simple distributed count down latch interface.
+ * Latch supports count down and await logic.
+ * Latch functionality is not relied on caches and has own state management {@link ExchangeLatchManager}.
+ */
+public interface Latch {
+ /**
+ * Decrements count on current latch.
+ * Release all latch waiters on all nodes if count reaches zero.
+ *
+ * This is idempotent operation. Invoking this method twice or more on the same node doesn't have any effect.
+ */
+ void countDown();
+
+ /**
+ * Awaits current latch completion.
+ *
+ * @throws IgniteCheckedException If await is failed.
+ */
+ void await() throws IgniteCheckedException;
+
+ /**
+ * Awaits current latch completion with specified timeout.
+ *
+ * @param timeout Timeout value.
+ * @param timeUnit Timeout time unit.
+ * @throws IgniteCheckedException If await is failed.
+ */
+ void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException;
+}
[06/10] ignite git commit: IGNITE-7830: Knn Lin Reg with new datasets
Posted by ak...@apache.org.
IGNITE-7830: Knn Lin Reg with new datasets
this closes #3583
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4653b7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4653b7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4653b7c
Branch: refs/heads/ignite-8201
Commit: a4653b7c1287a039206bf22e9d85125bb15bc412
Parents: 6557fe6
Author: zaleslaw <za...@gmail.com>
Authored: Wed Apr 11 12:31:48 2018 +0300
Committer: YuriBabak <y....@gmail.com>
Committed: Wed Apr 11 12:31:48 2018 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/ml/knn/KNNUtils.java | 59 ++++++++
.../KNNClassificationTrainer.java | 23 +--
.../ml/knn/regression/KNNRegressionModel.java | 87 +++++++++++
.../ml/knn/regression/KNNRegressionTrainer.java | 40 ++++++
.../ignite/ml/knn/regression/package-info.java | 22 +++
.../apache/ignite/ml/knn/KNNRegressionTest.java | 143 +++++++++++++++++++
.../org/apache/ignite/ml/knn/KNNTestSuite.java | 1 +
7 files changed, 354 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
new file mode 100644
index 0000000..88fa70f
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/KNNUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn;
+
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.structures.LabeledDataset;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.structures.partition.LabeledDatasetPartitionDataBuilderOnHeap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for KNNRegression.
+ */
+public class KNNUtils {
+ /**
+ * Builds dataset.
+ *
+ * @param datasetBuilder Dataset builder.
+ * @param featureExtractor Feature extractor.
+ * @param lbExtractor Label extractor.
+ * @return Dataset.
+ */
+ @Nullable public static <K, V> Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> buildDataset(DatasetBuilder<K, V> datasetBuilder, IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
+ PartitionDataBuilder<K, V, KNNPartitionContext, LabeledDataset<Double, LabeledVector>> partDataBuilder
+ = new LabeledDatasetPartitionDataBuilderOnHeap<>(
+ featureExtractor,
+ lbExtractor
+ );
+
+ Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset = null;
+
+ if (datasetBuilder != null) {
+ dataset = datasetBuilder.build(
+ (upstream, upstreamSize) -> new KNNPartitionContext(),
+ partDataBuilder
+ );
+ }
+ return dataset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
index 357047f..c0c8e65 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/classification/KNNClassificationTrainer.java
@@ -17,14 +17,9 @@
package org.apache.ignite.ml.knn.classification;
-import org.apache.ignite.ml.dataset.Dataset;
import org.apache.ignite.ml.dataset.DatasetBuilder;
-import org.apache.ignite.ml.dataset.PartitionDataBuilder;
-import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.knn.KNNUtils;
import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.structures.LabeledDataset;
-import org.apache.ignite.ml.structures.partition.LabeledDatasetPartitionDataBuilderOnHeap;
-import org.apache.ignite.ml.structures.LabeledVector;
import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
/**
@@ -41,20 +36,6 @@ public class KNNClassificationTrainer implements SingleLabelDatasetTrainer<KNNCl
*/
@Override public <K, V> KNNClassificationModel fit(DatasetBuilder<K, V> datasetBuilder,
IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
- PartitionDataBuilder<K, V, KNNPartitionContext, LabeledDataset<Double, LabeledVector>> partDataBuilder
- = new LabeledDatasetPartitionDataBuilderOnHeap<>(
- featureExtractor,
- lbExtractor
- );
-
- Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset = null;
-
- if (datasetBuilder != null) {
- dataset = datasetBuilder.build(
- (upstream, upstreamSize) -> new KNNPartitionContext(),
- partDataBuilder
- );
- }
- return new KNNClassificationModel<>(dataset);
+ return new KNNClassificationModel<>(KNNUtils.buildDataset(datasetBuilder, featureExtractor, lbExtractor));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
new file mode 100644
index 0000000..cabc143
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionModel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.ml.knn.regression;
+
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.knn.classification.KNNClassificationModel;
+import org.apache.ignite.ml.knn.partitions.KNNPartitionContext;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
+import org.apache.ignite.ml.structures.LabeledDataset;
+import org.apache.ignite.ml.structures.LabeledVector;
+
+import java.util.List;
+
+/**
+ * This class provides kNN Multiple Linear Regression or Locally [weighted] regression (Simple and Weighted versions).
+ *
+ * <p> This is an instance-based learning method. </p>
+ *
+ * <ul>
+ * <li>Local means using nearby points (i.e. a nearest neighbors approach).</li>
+ * <li>Weighted means we value points based upon how far away they are.</li>
+ * <li>Regression means approximating a function.</li>
+ * </ul>
+ */
+public class KNNRegressionModel<K,V> extends KNNClassificationModel<K,V> {
+ /**
+ * Builds the model via prepared dataset.
+ * @param dataset Specially prepared object to run algorithm over it.
+ */
+ public KNNRegressionModel(Dataset<KNNPartitionContext, LabeledDataset<Double, LabeledVector>> dataset) {
+ super(dataset);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Double apply(Vector v) {
+ List<LabeledVector> neighbors = findKNearestNeighbors(v);
+
+ return predictYBasedOn(neighbors, v);
+ }
+
+ /** */
+ private double predictYBasedOn(List<LabeledVector> neighbors, Vector v) {
+ switch (stgy) {
+ case SIMPLE:
+ return simpleRegression(neighbors);
+ case WEIGHTED:
+ return weightedRegression(neighbors, v);
+ default:
+ throw new UnsupportedOperationException("Strategy " + stgy.name() + " is not supported");
+ }
+ }
+
+ /** */
+ private double weightedRegression(List<LabeledVector> neighbors, Vector v) {
+ double sum = 0.0;
+ double div = 0.0;
+ for (LabeledVector<Vector, Double> neighbor : neighbors) {
+ double distance = distanceMeasure.compute(v, neighbor.features());
+ sum += neighbor.label() * distance;
+ div += distance;
+ }
+ return sum / div;
+ }
+
+ /** */
+ private double simpleRegression(List<LabeledVector> neighbors) {
+ double sum = 0.0;
+ for (LabeledVector<Vector, Double> neighbor : neighbors)
+ sum += neighbor.label();
+ return sum / (double)k;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
new file mode 100644
index 0000000..2d13cd5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn.regression;
+
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.knn.KNNUtils;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * kNN algorithm trainer to solve regression task.
+ */
+public class KNNRegressionTrainer{
+ /**
+ * Trains model based on the specified data.
+ *
+ * @param datasetBuilder Dataset builder.
+ * @param featureExtractor Feature extractor.
+ * @param lbExtractor Label extractor.
+ * @return Model.
+ */
+ public <K, V> KNNRegressionModel fit(DatasetBuilder<K, V> datasetBuilder,
+ IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
+ return new KNNRegressionModel<>(KNNUtils.buildDataset(datasetBuilder, featureExtractor, lbExtractor));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
new file mode 100644
index 0000000..82e7192
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains helper classes for kNN regression algorithms.
+ */
+package org.apache.ignite.ml.knn.regression;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
new file mode 100644
index 0000000..66dbca9
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNRegressionTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.knn;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
+import org.apache.ignite.ml.knn.classification.KNNStrategy;
+import org.apache.ignite.ml.knn.regression.KNNRegressionModel;
+import org.apache.ignite.ml.knn.regression.KNNRegressionTrainer;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.distances.EuclideanDistance;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link KNNRegressionTrainer}.
+ */
+public class KNNRegressionTest extends BaseKNNTest {
+ /** */
+ private double[] y;
+
+ /** */
+ private double[][] x;
+
+ /** */
+ public void testSimpleRegressionWithOneNeighbour() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ Map<Integer, double[]> data = new HashMap<>();
+ data.put(0, new double[] {11.0, 0, 0, 0, 0, 0});
+ data.put(1, new double[] {12.0, 2.0, 0, 0, 0, 0});
+ data.put(2, new double[] {13.0, 0, 3.0, 0, 0, 0});
+ data.put(3, new double[] {14.0, 0, 0, 4.0, 0, 0});
+ data.put(4, new double[] {15.0, 0, 0, 0, 5.0, 0});
+ data.put(5, new double[] {16.0, 0, 0, 0, 0, 6.0});
+
+ KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+ KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+ new LocalDatasetBuilder<>(data, 2),
+ (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+ (k, v) -> v[0]
+ ).withK(1)
+ .withDistanceMeasure(new EuclideanDistance())
+ .withStrategy(KNNStrategy.SIMPLE);
+
+ Vector vector = new DenseLocalOnHeapVector(new double[] {0, 0, 0, 5.0, 0.0});
+ System.out.println(knnMdl.apply(vector));
+ Assert.assertEquals(15, knnMdl.apply(vector), 1E-12);
+ }
+
+ /** */
+ public void testLongly() {
+
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ Map<Integer, double[]> data = new HashMap<>();
+ data.put(0, new double[] {60323, 83.0, 234289, 2356, 1590, 107608, 1947});
+ data.put(1, new double[] {61122, 88.5, 259426, 2325, 1456, 108632, 1948});
+ data.put(2, new double[] {60171, 88.2, 258054, 3682, 1616, 109773, 1949});
+ data.put(3, new double[] {61187, 89.5, 284599, 3351, 1650, 110929, 1950});
+ data.put(4, new double[] {63221, 96.2, 328975, 2099, 3099, 112075, 1951});
+ data.put(5, new double[] {63639, 98.1, 346999, 1932, 3594, 113270, 1952});
+ data.put(6, new double[] {64989, 99.0, 365385, 1870, 3547, 115094, 1953});
+ data.put(7, new double[] {63761, 100.0, 363112, 3578, 3350, 116219, 1954});
+ data.put(8, new double[] {66019, 101.2, 397469, 2904, 3048, 117388, 1955});
+ data.put(9, new double[] {68169, 108.4, 442769, 2936, 2798, 120445, 1957});
+ data.put(10, new double[] {66513, 110.8, 444546, 4681, 2637, 121950, 1958});
+ data.put(11, new double[] {68655, 112.6, 482704, 3813, 2552, 123366, 1959});
+ data.put(12, new double[] {69564, 114.2, 502601, 3931, 2514, 125368, 1960});
+ data.put(13, new double[] {69331, 115.7, 518173, 4806, 2572, 127852, 1961});
+ data.put(14, new double[] {70551, 116.9, 554894, 4007, 2827, 130081, 1962});
+
+ KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+ KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+ new LocalDatasetBuilder<>(data, 2),
+ (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+ (k, v) -> v[0]
+ ).withK(3)
+ .withDistanceMeasure(new EuclideanDistance())
+ .withStrategy(KNNStrategy.SIMPLE);
+
+ Vector vector = new DenseLocalOnHeapVector(new double[] {104.6, 419180, 2822, 2857, 118734, 1956});
+ System.out.println(knnMdl.apply(vector));
+ Assert.assertEquals(67857, knnMdl.apply(vector), 2000);
+ }
+
+ /** */
+ public void testLonglyWithWeightedStrategy() {
+ IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+ Map<Integer, double[]> data = new HashMap<>();
+ data.put(0, new double[] {60323, 83.0, 234289, 2356, 1590, 107608, 1947});
+ data.put(1, new double[] {61122, 88.5, 259426, 2325, 1456, 108632, 1948});
+ data.put(2, new double[] {60171, 88.2, 258054, 3682, 1616, 109773, 1949});
+ data.put(3, new double[] {61187, 89.5, 284599, 3351, 1650, 110929, 1950});
+ data.put(4, new double[] {63221, 96.2, 328975, 2099, 3099, 112075, 1951});
+ data.put(5, new double[] {63639, 98.1, 346999, 1932, 3594, 113270, 1952});
+ data.put(6, new double[] {64989, 99.0, 365385, 1870, 3547, 115094, 1953});
+ data.put(7, new double[] {63761, 100.0, 363112, 3578, 3350, 116219, 1954});
+ data.put(8, new double[] {66019, 101.2, 397469, 2904, 3048, 117388, 1955});
+ data.put(9, new double[] {68169, 108.4, 442769, 2936, 2798, 120445, 1957});
+ data.put(10, new double[] {66513, 110.8, 444546, 4681, 2637, 121950, 1958});
+ data.put(11, new double[] {68655, 112.6, 482704, 3813, 2552, 123366, 1959});
+ data.put(12, new double[] {69564, 114.2, 502601, 3931, 2514, 125368, 1960});
+ data.put(13, new double[] {69331, 115.7, 518173, 4806, 2572, 127852, 1961});
+ data.put(14, new double[] {70551, 116.9, 554894, 4007, 2827, 130081, 1962});
+
+ KNNRegressionTrainer trainer = new KNNRegressionTrainer();
+
+ KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
+ new LocalDatasetBuilder<>(data, 2),
+ (k, v) -> Arrays.copyOfRange(v, 1, v.length),
+ (k, v) -> v[0]
+ ).withK(3)
+ .withDistanceMeasure(new EuclideanDistance())
+ .withStrategy(KNNStrategy.SIMPLE);
+
+ Vector vector = new DenseLocalOnHeapVector(new double[] {104.6, 419180, 2822, 2857, 118734, 1956});
+ System.out.println(knnMdl.apply(vector));
+ Assert.assertEquals(67857, knnMdl.apply(vector), 2000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a4653b7c/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
index 95ebec5..55ef24e 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/knn/KNNTestSuite.java
@@ -26,6 +26,7 @@ import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses({
KNNClassificationTest.class,
+ KNNRegressionTest.class,
LabeledDatasetTest.class
})
public class KNNTestSuite {
[04/10] ignite git commit: IGNITE-7222 .NET: Ignore missing
IgniteConfiguration.CommunicationFailureResolver
Posted by ak...@apache.org.
IGNITE-7222 .NET: Ignore missing IgniteConfiguration.CommunicationFailureResolver
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780fc07b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780fc07b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780fc07b
Branch: refs/heads/ignite-8201
Commit: 780fc07be0b257b578647682585c89548e6d695d
Parents: da77b98
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Wed Apr 11 11:51:45 2018 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Wed Apr 11 11:51:45 2018 +0300
----------------------------------------------------------------------
.../ApiParity/IgniteConfigurationParityTest.cs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/780fc07b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
index d68083f..bf34fc0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
@@ -63,7 +63,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
"ClassLoader",
"CacheStoreSessionListenerFactories",
"PlatformConfiguration",
- "ExecutorConfiguration"
+ "ExecutorConfiguration",
+ "CommunicationFailureResolver"
};
/** Properties that are missing on .NET side. */