You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/07 12:52:40 UTC

[5/5] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f50c7ccb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f50c7ccb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f50c7ccb

Branch: refs/heads/ignite-zk
Commit: f50c7ccb5e80731426446c2051231fd55c912d23
Parents: c0dfb20
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 7 13:49:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 7 15:49:31 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  7 +++++
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 32 +++++++++++++++++---
 .../discovery/zk/internal/ZookeeperClient.java  |  4 ++-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  5 +--
 .../ignite/internal/GridDiscoverySelfTest.java  |  5 +--
 .../internal/GridSameVmStartupSelfTest.java     | 19 +++++++++---
 .../binary/BinaryMetadataUpdatesFlowTest.java   |  6 ++++
 8 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 30d3d26..4f481fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -413,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
             assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
                 "Node joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ", evt=" + evt + ']';
 
             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 662a2b9..e03e2cf 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2100,6 +2100,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         }
     }
 
+    /**
+     * @return Bound TCP server port.
+     */
+    public int boundPort() {
+        return boundTcpPort;
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
         assert locHost != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index bd7f427..b9c2431 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -42,6 +43,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration;
 import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
@@ -74,7 +76,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
     /** */
     @GridToStringInclude
-    private int sesTimeout = 10_000;
+    private int sesTimeout;
 
     /** */
     @GridToStringInclude
@@ -202,8 +204,22 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
                 if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT))
                     consistentId = U.consistentId(sortedAddrs);
                 else {
-                    Integer commPort = (Integer)locNodeAttrs.get(
-                        TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
+                    Integer commPort = null;
+
+                    if (locNodeAttrs != null) {
+                        commPort = (Integer)locNodeAttrs.get(
+                            TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT);
+                    }
+                    else {
+                        CommunicationSpi commSpi = ignite.configuration().getCommunicationSpi();
+
+                        if (commSpi instanceof TcpCommunicationSpi) {
+                            commPort = ((TcpCommunicationSpi)commSpi).boundPort();
+
+                            if (commPort == -1)
+                                commPort = null;
+                        }
+                    }
 
                     if (commPort == null) {
                         U.warn(log, "Can not initialize default consistentId, TcpCommunicationSpi port is not initialized.");
@@ -339,11 +355,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
+        if (sesTimeout == 0)
+            sesTimeout = ignite.configuration().getFailureDetectionTimeout().intValue();
+
+        assertParameter(sesTimeout > 0, "sessionTimeout > 0");
+        A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be empty");
+
         ZookeeperClusterNode locNode = initLocalNode();
 
         log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString +
-            ", sesTimeout=" + sesTimeout +
-            ", rootPath=" + zkRootPath + ']');
+            ", sessionTimeout=" + sesTimeout +
+            ", zkRootPath=" + zkRootPath + ']');
 
         impl = new ZookeeperDiscoveryImpl(
             this,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 0d81cb1..a0bc2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.zookeeper.AsyncCallback;
@@ -42,7 +43,8 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ZookeeperClient implements Watcher {
     /** */
-    private static final long RETRY_TIMEOUT = 1000;
+    private static final long RETRY_TIMEOUT =
+        IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 1000);
 
     /** */
     private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 93015ed..2e97aed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -230,7 +230,8 @@ public class ZookeeperDiscoveryImpl {
      */
     public boolean pingNode(UUID nodeId) {
         // TODO ZK
-        checkState();
+        if (connState == ConnectionState.DISCONNECTED)
+            throw new IgniteClientDisconnectedException(null, "Client is disconnected.");
 
         return node(nodeId) != null;
     }
@@ -950,7 +951,7 @@ public class ZookeeperDiscoveryImpl {
         assert rtState.crd;
 
         if (log.isInfoEnabled())
-            log.info("Process alive nodes change: " + aliveNodes.size());
+            log.info("Process alive nodes change [alives=" + aliveNodes.size() + "]");
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index e6b678b..9af2df3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.lang.IgniteProductVersion.fromString;
@@ -171,7 +172,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
 
                     joinedCnt.countDown();
                 }
-                else if (EVT_NODE_LEFT == evt.type()) {
+                else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) {
                     int i = cnt.decrementAndGet();
 
                     assert i >= 0;
@@ -185,7 +186,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
             }
         };
 
-        ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED);
+        ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED);
 
         try {
             for (int i = 0; i < NODES_CNT; i++)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
index 66e9cf4..a04c38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -75,8 +76,10 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
 
             ignite2.events().localListen(new IgnitePredicate<Event>() {
                 @Override public boolean apply(Event evt) {
-                    assert evt.type() != EVT_NODE_FAILED :
-                        "Node1 did not exit gracefully.";
+                    boolean tcpDiscovery = tcpDiscovery();
+
+                    if (tcpDiscovery)
+                        assert evt.type() != EVT_NODE_FAILED : "Node1 did not exit gracefully.";
 
                     if (evt instanceof DiscoveryEvent) {
                         // Local node can send METRICS_UPDATED event.
@@ -86,8 +89,14 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
                                 ((DiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId +
                                 ", type=" + evt.type() + ']';
 
-                        if (evt.type() == EVT_NODE_LEFT)
-                            latch.countDown();
+                        if (tcpDiscovery) {
+                            if (evt.type() == EVT_NODE_LEFT)
+                                latch.countDown();
+                        }
+                        else {
+                            if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+                                latch.countDown();
+                        }
                     }
 
                     return true;
@@ -96,7 +105,7 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
 
             stopGrid(1);
 
-            latch.await();
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
 
             Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f50c7ccb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
index 3ee51c8..1f88bb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -316,6 +316,9 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
     public void testFlowNoConflictsWithClients() throws Exception {
         startComputation(0, stopFlag0);
 
+        if (!tcpDiscovery())
+            return;
+
         startComputation(1, stopFlag1);
 
         startComputation(2, stopFlag2);
@@ -617,6 +620,9 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
             while (!updatesQueue.isEmpty()) {
                 BinaryUpdateDescription desc = updatesQueue.poll();
 
+                if (desc == null)
+                    break;
+
                 BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
 
                 BinaryObject bo = newBinaryObject(builder, desc);