You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/07 12:52:40 UTC
[5/5] ignite git commit: zk
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f50c7ccb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f50c7ccb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f50c7ccb
Branch: refs/heads/ignite-zk
Commit: f50c7ccb5e80731426446c2051231fd55c912d23
Parents: c0dfb20
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 7 13:49:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 7 15:49:31 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 7 +++++
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 32 +++++++++++++++++---
.../discovery/zk/internal/ZookeeperClient.java | 4 ++-
.../zk/internal/ZookeeperDiscoveryImpl.java | 5 +--
.../ignite/internal/GridDiscoverySelfTest.java | 5 +--
.../internal/GridSameVmStartupSelfTest.java | 19 +++++++++---
.../binary/BinaryMetadataUpdatesFlowTest.java | 6 ++++
8 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 30d3d26..4f481fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -413,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
"Node joined with smaller-than-local " +
- "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+ "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ", evt=" + evt + ']';
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 662a2b9..e03e2cf 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2100,6 +2100,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ /**
+ * @return Bound TCP server port.
+ */
+ public int boundPort() {
+ return boundTcpPort;
+ }
+
/** {@inheritDoc} */
@Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
assert locHost != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bd7f427..b9c2431 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -42,6 +43,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
@@ -74,7 +76,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** */
@GridToStringInclude
- private int sesTimeout = 10_000;
+ private int sesTimeout;
/** */
@GridToStringInclude
@@ -202,8 +204,22 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT))
consistentId = U.consistentId(sortedAddrs);
else {
- Integer commPort = (Integer)locNodeAttrs.get(
- TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
+ Integer commPort = null;
+
+ if (locNodeAttrs != null) {
+ commPort = (Integer)locNodeAttrs.get(
+ TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
+ }
+ else {
+ CommunicationSpi commSpi = ignite.configuration().getCommunicationSpi();
+
+ if (commSpi instanceof TcpCommunicationSpi) {
+ commPort = ((TcpCommunicationSpi)commSpi).boundPort();
+
+ if (commPort == -1)
+ commPort = null;
+ }
+ }
if (commPort == null) {
U.warn(log, "Can not initialize default consistentId, TcpCommunicationSpi port is not initialized.");
@@ -339,11 +355,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
+ if (sesTimeout == 0)
+ sesTimeout = ignite.configuration().getFailureDetectionTimeout().intValue();
+
+ assertParameter(sesTimeout > 0, "sessionTimeout > 0");
+ A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be empty");
+
ZookeeperClusterNode locNode = initLocalNode();
log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
- ", sesTimeout=" + sesTimeout +
- ", rootPath=" + zkRootPath + ']');
+ ", sessionTimeout=" + sesTimeout +
+ ", zkRootPath=" + zkRootPath + ']');
impl = new ZookeeperDiscoveryImpl(
this,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 0d81cb1..a0bc2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.zookeeper.AsyncCallback;
@@ -42,7 +43,8 @@ import org.jetbrains.annotations.Nullable;
*/
public class ZookeeperClient implements Watcher {
/** */
- private static final long RETRY_TIMEOUT = 1000;
+ private static final long RETRY_TIMEOUT =
+ IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 1000);
/** */
private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 93015ed..2e97aed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -230,7 +230,8 @@ public class ZookeeperDiscoveryImpl {
*/
public boolean pingNode(UUID nodeId) {
// TODO ZK
- checkState();
+ if (connState == ConnectionState.DISCONNECTED)
+ throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
return node(nodeId) != null;
}
@@ -950,7 +951,7 @@ public class ZookeeperDiscoveryImpl {
assert rtState.crd;
if (log.isInfoEnabled())
- log.info("Process alive nodes change: " + aliveNodes.size());
+ log.info("Process alive nodes change [alives=" + aliveNodes.size() + "]");
TreeMap<Integer, String> alives = new TreeMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index e6b678b..9af2df3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.lang.IgniteProductVersion.fromString;
@@ -171,7 +172,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
joinedCnt.countDown();
}
- else if (EVT_NODE_LEFT == evt.type()) {
+ else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) {
int i = cnt.decrementAndGet();
assert i >= 0;
@@ -185,7 +186,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
}
};
- ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED);
+ ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED);
try {
for (int i = 0; i < NODES_CNT; i++)
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
index 66e9cf4..a04c38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -75,8 +76,10 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
ignite2.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
- assert evt.type() != EVT_NODE_FAILED :
- "Node1 did not exit gracefully.";
+ boolean tcpDiscovery = tcpDiscovery();
+
+ if (tcpDiscovery)
+ assert evt.type() != EVT_NODE_FAILED : "Node1 did not exit gracefully.";
if (evt instanceof DiscoveryEvent) {
// Local node can send METRICS_UPDATED event.
@@ -86,8 +89,14 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
((DiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId +
", type=" + evt.type() + ']';
- if (evt.type() == EVT_NODE_LEFT)
- latch.countDown();
+ if (tcpDiscovery) {
+ if (evt.type() == EVT_NODE_LEFT)
+ latch.countDown();
+ }
+ else {
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+ latch.countDown();
+ }
}
return true;
@@ -96,7 +105,7 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
stopGrid(1);
- latch.await();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
index 3ee51c8..1f88bb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -316,6 +316,9 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
public void testFlowNoConflictsWithClients() throws Exception {
startComputation(0, stopFlag0);
+ if (!tcpDiscovery())
+ return;
+
startComputation(1, stopFlag1);
startComputation(2, stopFlag2);
@@ -617,6 +620,9 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
while (!updatesQueue.isEmpty()) {
BinaryUpdateDescription desc = updatesQueue.poll();
+ if (desc == null)
+ break;
+
BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
BinaryObject bo = newBinaryObject(builder, desc);