You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/01/19 19:02:12 UTC

[ignite] branch master updated: IGNITE-13567 Fixed incorrect value of the joiningNodeClient flag (#8350)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 28c738a  IGNITE-13567 Fixed incorrect value of the joiningNodeClient flag (#8350)
28c738a is described below

commit 28c738a1889ef67e69d63425f995b3c549583425
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Tue Jan 19 22:00:27 2021 +0300

    IGNITE-13567 Fixed incorrect value of the joiningNodeClient flag (#8350)
---
 .../ignite/spi/discovery/tcp/ClientImpl.java       |   9 +-
 .../discovery/DiscoverySpiDataExchangeTest.java    | 267 +++++++++++++++++++++
 .../FilterDataForClientNodeDiscoveryTest.java      |   4 +-
 .../IgniteSpiDiscoverySelfTestSuite.java           |   5 +-
 .../zk/ZookeeperDiscoverySpiTestSuite4.java        |   4 +-
 5 files changed, 283 insertions(+), 6 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5017e18..55adeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -763,8 +763,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         marshalCredentials(node);
                     }
 
-                    if (discoveryData == null)
-                        discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
+                    if (discoveryData == null) {
+                        DiscoveryDataPacket dataPacket = new DiscoveryDataPacket(getLocalNodeId());
+
+                        dataPacket.joiningNodeClient(true);
+
+                        discoveryData = spi.collectExchangeData(dataPacket);
+                    }
 
                     TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
new file mode 100644
index 0000000..44eef02
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+/**
+ * Discovery data exchange test.
+ */
+public class DiscoverySpiDataExchangeTest extends GridCommonAbstractTest {
+    /** Closure to collect discovery data bags when node is joined. */
+    private BiConsumer<ClusterNode, DiscoveryDataBag> dataExchangeCollectClosure;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException {
+        IgniteConfiguration cfg0 = super.optimize(cfg);
+
+        cfg0.setDiscoverySpi(new DelegatedDiscoverySpi((IgniteDiscoverySpi)cfg.getDiscoverySpi()));
+
+        return cfg0;
+    }
+
+    /**
+     * Ensures that {@link DiscoveryDataBag#isJoiningNodeClient()} returns a valid value when joining a node.
+     *
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testJoiningNodeClientFlag() throws Exception {
+        for (int nodeIdx = 0; nodeIdx < 4; nodeIdx++) {
+            Collection<T2<ClusterNode, DiscoveryDataBag>> dataBags = new ConcurrentLinkedQueue<>();
+
+            dataExchangeCollectClosure = (locNode, dataBag) -> dataBags.add(new T2<>(locNode, dataBag));
+
+            IgniteEx node = nodeIdx % 2 == 0 ? startGrid(nodeIdx) : startClientGrid(nodeIdx);
+
+            assertFalse(dataBags.isEmpty());
+
+            assertTrue(dataBags.toString(),
+                dataBags.stream().allMatch(pair -> node.context().clientNode() == pair.get2().isJoiningNodeClient()));
+        }
+    }
+
+    /** Delegated discovery. */
+    @IgniteSpiMultipleInstancesSupport(true)
+    @DiscoverySpiHistorySupport(true)
+    private class DelegatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi {
+        /** Discovery delegate. */
+        private final IgniteDiscoverySpi delegate;
+
+        /**
+         * @param delegate Discovery delegate.
+         */
+        DelegatedDiscoverySpi(IgniteDiscoverySpi delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Serializable consistentId() throws IgniteSpiException {
+            return delegate.consistentId();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> getRemoteNodes() {
+            return delegate.getRemoteNodes();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode getLocalNode() {
+            return delegate.getLocalNode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable ClusterNode getNode(UUID nodeId) {
+            return delegate.getNode(nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean pingNode(UUID nodeId) {
+            return delegate.pingNode(nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+            delegate.setNodeAttributes(attrs, ver);
+        }
+
+        /** {@inheritDoc} */
+        @Deprecated
+        @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+            delegate.setListener(lsnr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+            delegate.setDataExchange(new DelegatedDiscoverySpiDataExchange(exchange));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+            delegate.setMetricsProvider(metricsProvider);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void disconnect() throws IgniteSpiException {
+            delegate.disconnect();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+            delegate.setAuthenticator(auth);
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getGridStartTime() {
+            return delegate.getGridStartTime();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+            delegate.sendCustomEvent(msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void failNode(UUID nodeId, @Nullable String warning) {
+            delegate.failNode(nodeId, warning);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isClientMode() throws IllegalStateException {
+            return delegate.isClientMode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
+            delegate.spiStart(igniteInstanceName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            delegate.spiStop();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onContextInitialized0(final IgniteSpiContext spiCtx) throws IgniteSpiException {
+            delegate.onContextInitialized(spiCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void injectResources(Ignite ignite) {
+            super.injectResources(ignite);
+
+            if (ignite != null && delegate instanceof IgniteSpiAdapter) {
+                try {
+                    ((IgniteEx)ignite).context().resource().inject(delegate);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Unable to inject resources", e);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean knownNode(UUID nodeId) {
+            return delegate.knownNode(nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean clientReconnectSupported() {
+            return delegate.clientReconnectSupported();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clientReconnect() {
+            delegate.clientReconnect();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean allNodesSupport(IgniteFeatures feature) {
+            return delegate.allNodesSupport(feature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void simulateNodeFailure() {
+            delegate.simulateNodeFailure();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+            delegate.setInternalListener(lsnr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean supportsCommunicationFailureResolve() {
+            return delegate.supportsCommunicationFailureResolve();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
+            delegate.resolveCommunicationFailure(node, err);
+        }
+
+        /** Delegated discovery data exchange. */
+        private class DelegatedDiscoverySpiDataExchange implements DiscoverySpiDataExchange {
+            /** Discovery data exchange delegate. */
+            private final DiscoverySpiDataExchange delegate;
+
+            /**
+             * @param delegate Discovery data exchange delegate.
+             */
+            public DelegatedDiscoverySpiDataExchange(DiscoverySpiDataExchange delegate) {
+                this.delegate = delegate;
+            }
+
+            /** {@inheritDoc} */
+            @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+                if (dataExchangeCollectClosure != null)
+                    dataExchangeCollectClosure.accept(getLocalNode(), dataBag);
+
+                return delegate.collect(dataBag);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onExchange(DiscoveryDataBag dataBag) {
+                delegate.onExchange(dataBag);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
index 9996791..1ca301d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
@@ -63,8 +63,8 @@ public class FilterDataForClientNodeDiscoveryTest extends GridCommonAbstractTest
         startClientGrid(configuration(2));
         startClientGrid(configuration(3));
 
-        assertEquals(5, joinSrvCnt);
-        assertEquals(4, joinCliCnt);
+        assertEquals(3, joinSrvCnt);
+        assertEquals(6, joinCliCnt);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 22feebf..f29d54a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.IgniteDiscoveryMassiveNodeFailTest;
 import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategyTest;
 import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
 import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchangeTest;
 import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
 import org.apache.ignite.spi.discovery.IgniteClientReconnectEventHandlingTest;
 import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest;
@@ -169,7 +170,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
 
     IgniteMetricsOverflowTest.class,
 
-    DiscoveryClientSocketTest.class
+    DiscoveryClientSocketTest.class,
+
+    DiscoverySpiDataExchangeTest.class
 })
 public class IgniteSpiDiscoverySelfTestSuite {
     /** */
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java
index ac73c29..a203006 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
 import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest;
 import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageTest;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchangeTest;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -44,7 +45,8 @@ import org.junit.runners.Suite;
     IgniteCacheReplicatedQuerySelfTest.class,
     DistributedMetaStorageTest.class,
     DistributedMetaStoragePersistentTest.class,
-    IgniteNodeValidationFailedEventTest.class
+    IgniteNodeValidationFailedEventTest.class,
+    DiscoverySpiDataExchangeTest.class
 })
 public class ZookeeperDiscoverySpiTestSuite4 {
     /** */