You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/18 16:17:04 UTC
ignite git commit: IGNITE-5943 Communication. Server node may reject
client connection during massive clients join. This closes #2423
Repository: ignite
Updated Branches:
refs/heads/master 45708b972 -> 153931534
IGNITE-5943 Communication. Server node may reject client connection during massive clients join. This closes #2423
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15393153
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15393153
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15393153
Branch: refs/heads/master
Commit: 1539315342d09150a4af942fce2f8147e390f85e
Parents: 45708b9
Author: EdShangGG <es...@gridgain.com>
Authored: Fri Aug 18 18:52:59 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Aug 18 19:13:58 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 32 ++--
.../communication/tcp/TcpCommunicationSpi.java | 60 ++++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 16 ++
.../spi/discovery/tcp/TcpDiscoverySpi.java | 10 ++
.../discovery/tcp/IgniteClientConnectTest.java | 163 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 6 +
6 files changed, 268 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be4aace..0b7a243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -609,21 +609,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean skipStore = opCtx != null && opCtx.skipStore();
- if (asyncOp) {return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
- @Override public IgniteInternalFuture<Map<K, V>> apply() {
- return getAllAsync0(ctx.cacheKeysView(keys),
- forcePrimary,
- subjId0,
- taskName,
- deserializeBinary,
- recovery,
- expiryPlc,
- skipVals,
- skipStore,
- canRemap,
- needVer);
- }
- });}
+ if (asyncOp) {
+ return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
+ @Override public IgniteInternalFuture<Map<K, V>> apply() {
+ return getAllAsync0(ctx.cacheKeysView(keys),
+ forcePrimary,
+ subjId0,
+ taskName,
+ deserializeBinary,
+ recovery,
+ expiryPlc,
+ skipVals,
+ skipStore,
+ canRemap,
+ needVer);
+ }
+ });
+ }
else {
return getAllAsync0(ctx.cacheKeysView(keys),
forcePrimary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1b00b5d..bab9cfa 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -109,6 +109,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -132,6 +133,8 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -141,6 +144,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING;
/**
@@ -296,6 +300,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
*/
public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
+ /**
+ * Version when client is ready to wait to connect to server (could be needed when client tries to open connection
+ * before it starts being visible for server)
+ */
+ private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
+
/** Connection index meta for session. */
private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -442,7 +452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @param ses Session.
* @param msg Message.
*/
- private void onFirstMessage(GridNioSession ses, Message msg) {
+ private void onFirstMessage(final GridNioSession ses, Message msg) {
UUID sndId;
ConnectionKey connKey;
@@ -466,10 +476,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
final ClusterNode rmtNode = getSpiContext().node(sndId);
if (rmtNode == null) {
- U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId +
- ", ses=" + ses + ']');
+ DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi();
+
+ assert discoverySpi instanceof TcpDiscoverySpi;
+
+ TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi;
+
+ ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
+
+ boolean unknownNode = true;
- ses.close();
+ if (node0 != null) {
+ assert node0.isClient() : node0;
+
+ if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
+ unknownNode = false;
+ }
+
+ if (unknownNode) {
+ U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
+
+ ses.close();
+ }
+ else {
+ ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ ses.close();
+ }
+ });
+ }
return;
}
@@ -3031,6 +3066,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
!node.isClient());
+ int lastWaitingTimeout = 1;
+
while (!conn) { // Reconnection on handshake timeout.
try {
SocketChannel ch = SocketChannel.open();
@@ -3101,6 +3138,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
}
+ else if (rcvCnt == NEED_WAIT) {
+ recoveryDesc.release();
+
+ U.closeQuiet(ch);
+
+ if (lastWaitingTimeout < 60000)
+ lastWaitingTimeout *= 2;
+
+ U.sleep(lastWaitingTimeout);
+
+ continue;
+ }
}
finally {
if (recoveryDesc != null && rcvCnt == null)
@@ -4559,6 +4608,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** */
static final long NODE_STOPPING = -2;
+ /** Need wait. */
+ static final long NEED_WAIT = -3;
+
/** Message body size in bytes. */
private static final int MESSAGE_SIZE = 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a6de34b..ca7dd4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1805,6 +1805,22 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Trying get node in any state (visible or not)
+ * @param nodeId Node id.
+ */
+ ClusterNode getNode0(UUID nodeId) {
+ assert nodeId != null;
+
+ UUID locNodeId0 = getLocalNodeId();
+
+ if (locNodeId0 != null && locNodeId0.equals(nodeId))
+ // Return local node directly.
+ return locNode;
+
+ return ring.node(nodeId);
+ }
+
+ /**
* Thread that cleans IP finder and keeps it in the correct state, unregistering
* addresses of the nodes that has left the topology.
* <p>
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c988d7e..e6eaa8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -433,6 +433,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
return impl.getNode(nodeId);
}
+ /**
+ * @param id Id.
+ */
+ public ClusterNode getNode0(UUID id) {
+ if (impl instanceof ServerImpl)
+ return ((ServerImpl)impl).getNode0(id);
+
+ return getNode(id);
+ }
+
/** {@inheritDoc} */
@Override public boolean pingNode(UUID nodeId) {
return impl.pingNode(nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
new file mode 100644
index 0000000..1a89987
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+
+/**
+ * We emulate that client receive message about joining to topology earlier than some server nodes in topology.
+ * And make this client connect to such servers.
+ * To emulate this we connect client to second node in topology and pause sending message about joining finishing to
+ * third node.
+ */
+public class IgniteClientConnectTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Latch to stop message sending. */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /** Start client flag. */
+ private final AtomicBoolean clientJustStarted = new AtomicBoolean(false);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+ if (igniteInstanceName.equals("client")) {
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+
+ ipFinder.registerAddresses(Collections.singleton(new InetSocketAddress(InetAddress.getLoopbackAddress(), 47501)));
+
+ disco.setIpFinder(ipFinder);
+ }
+ else
+ disco.setIpFinder(ipFinder);
+
+ disco.setJoinTimeout(2 * 60_000);
+ disco.setSocketTimeout(1000);
+ disco.setNetworkTimeout(2000);
+
+ cfg.setDiscoverySpi(disco);
+
+ CacheConfiguration cacheConfiguration = new CacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setAffinity(new RendezvousAffinityFunction(false, 8))
+ .setBackups(0);
+
+ cfg.setCacheConfiguration(cacheConfiguration);
+
+ return cfg;
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientConnectToBigTopology() throws Exception {
+ Ignite ignite = startGrids(3);
+
+ IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 80; i++) {
+ cache.put(i, i);
+
+ keys.add(i);
+ }
+
+ TcpDiscoveryImpl discovery = ((TestTcpDiscoverySpi) ignite.configuration().getDiscoverySpi()).discovery();
+
+ assertTrue(discovery instanceof ServerImpl);
+
+ IgniteConfiguration clientCfg = getConfiguration("client");
+
+ clientCfg.setClientMode(true);
+
+ clientJustStarted.set(true);
+
+ IgniteEx client = startGrid(clientCfg);
+
+ latch.countDown();
+
+ System.err.println("GET ALL");
+ client.cache(DEFAULT_CACHE_NAME).getAll(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ *
+ */
+ class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ if (msg.senderNodeId() != null && clientJustStarted.get())
+ try {
+ latch.await();
+
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+ else
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /**
+ *
+ */
+ TcpDiscoveryImpl discovery() {
+ return impl;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 1287149..c506ca7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -20,6 +20,8 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
@@ -90,6 +92,10 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
suite.addTest(new TestSuite(AuthenticationRestartTest.class));
+ //Client connect
+ suite.addTest(new TestSuite(IgniteClientConnectTest.class));
+ suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));
+
// SSL.
suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class));