You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/27 12:15:02 UTC
[1/5] ignite git commit: IGNITE-6248 - Throw exception on unsupported
Java 1.7 releases.
Repository: ignite
Updated Branches:
refs/heads/ignite-3478 d46a03950 -> 6466adf54
IGNITE-6248 - Throw exception on unsupported Java 1.7 releases.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e228ce36
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e228ce36
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e228ce36
Branch: refs/heads/ignite-3478
Commit: e228ce3600332f1873f7250b7ca2919e2f3607bc
Parents: f0500e2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Oct 27 11:55:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 11:55:48 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 6 ++++
.../org/apache/ignite/internal/IgnitionEx.java | 34 ++++++++++++++++++--
2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e228ce36/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index d7d4443..4294c71 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -656,6 +656,12 @@ public final class IgniteSystemProperties {
/** Ignite page memory concurrency level. */
public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
+ /**
+ * Start Ignite on versions of JRE 7 older than 1.7.0_71. For proper work it may require
+ * disabling JIT in some places.
+ */
+ public static final String IGNITE_FORCE_START_JAVA7 = "IGNITE_FORCE_START_JAVA7";
+
/** Returns true for system properties only avoiding sending sensitive information. */
private static final IgnitePredicate<Map.Entry<String, String>> PROPS_FILTER = new IgnitePredicate<Map.Entry<String, String>>() {
@Override public boolean apply(final Map.Entry<String, String> entry) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e228ce36/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 67c771b..d84f8a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -118,6 +118,7 @@ import static org.apache.ignite.IgniteState.STOPPED_ON_SEGMENTATION;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_CLIENT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEP_MODE_OVERRIDE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_FORCE_START_JAVA7;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_HOST;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE;
@@ -189,9 +190,38 @@ public class IgnitionEx {
static {
// Check 1.8 just in case for forward compatibility.
if (!U.jdkVersion().contains("1.7") &&
- !U.jdkVersion().contains("1.8"))
- throw new IllegalStateException("Ignite requires Java 7 or above. Current Java version " +
+ !U.jdkVersion().contains("1.8")) {
+ throw new IllegalStateException("Ignite requires Java 1.7.0_71 or above. Current Java version " +
"is not supported: " + U.jdkVersion());
+ }
+
+ String jreVer = U.jreVersion();
+
+ if (jreVer.startsWith("1.7")) {
+ int upd = jreVer.indexOf('_');
+ int beta = jreVer.indexOf('-');
+
+ if (beta < 0)
+ beta = jreVer.length();
+
+ if (upd > 0 && beta > 0) {
+ try {
+ int update = Integer.parseInt(jreVer.substring(upd + 1, beta));
+
+ boolean forceJ7 = IgniteSystemProperties.getBoolean(IGNITE_FORCE_START_JAVA7, false);
+
+ if (update < 71 && !forceJ7) {
+ throw new IllegalStateException("Ignite requires Java 1.7.0_71 or above. Current Java version " +
+ "is not supported: " + jreVer);
+ }
+ else if (forceJ7)
+ System.err.println("Ignite requires Java 1.7.0_71 or above. Start on your own risk.");
+ }
+ catch (NumberFormatException ignore) {
+ // No-op
+ }
+ }
+ }
// To avoid nasty race condition in UUID.randomUUID() in JDK prior to 6u34.
// For details please see:
[3/5] ignite git commit: ignite-5860 Try process
TcpDiscoveryClientReconnectMessage from socket reader instead of always
processing it on coordinator.
Posted by sb...@apache.org.
ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56a63f80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56a63f80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56a63f80
Branch: refs/heads/ignite-3478
Commit: 56a63f80d1181e53a3e2a4c4f88e42226bbac86e
Parents: 717c549
Author: Denis Mekhanikov <dm...@gmail.com>
Authored: Fri Oct 27 14:12:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 14:13:40 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 311 ++++++++++---------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 20 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 275 +++++++++++++++-
5 files changed, 467 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5dbfe6e..139c110 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -26,6 +26,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -470,7 +471,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- * @param recon {@code True} if reconnects.
+ * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to
+ * and {@code null} otherwise.
* @param timeout Timeout.
* @return Opened socket or {@code null} if timeout.
* @throws InterruptedException If interrupted.
@@ -478,9 +480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
- @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+ @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout)
throws IgniteSpiException, InterruptedException {
- Collection<InetSocketAddress> addrs = null;
+ List<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
@@ -489,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new InterruptedException();
while (addrs == null || addrs.isEmpty()) {
- addrs = spi.resolvedAddresses();
+ addrs = new ArrayList<>(spi.resolvedAddresses());
if (!F.isEmpty(addrs)) {
if (log.isDebugEnabled())
@@ -509,22 +511,30 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+ // Process failed node last.
+ if (prevAddr != null) {
+ int idx = addrs.indexOf(prevAddr);
- Iterator<InetSocketAddress> it = addrs.iterator();
+ if (idx != -1)
+ Collections.swap(addrs, idx, 0);
+ }
+
+ Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
boolean wait = false;
- while (it.hasNext()) {
+ for (int i = addrs.size() - 1; i >= 0; i--) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
- InetSocketAddress addr = it.next();
+ InetSocketAddress addr = addrs.get(i);
+
+ boolean recon = prevAddr != null;
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
if (sockAndRes == null) {
- it.remove();
+ addrs.remove(i);
continue;
}
@@ -852,8 +862,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected IgniteSpiThread workerThread() {
- return msgWorker;
+ @Override protected Collection<IgniteSpiThread> threads() {
+ return Arrays.asList(sockWriter, msgWorker);
}
/**
@@ -1336,15 +1346,20 @@ class ClientImpl extends TcpDiscoveryImpl {
private boolean clientAck;
/** */
- private boolean join;
+ private final boolean join;
+
+ /** */
+ private final InetSocketAddress prevAddr;
/**
* @param join {@code True} if reconnects during join.
+ * @param prevAddr Address of the node, that this client was previously connected to.
*/
- protected Reconnector(boolean join) {
+ protected Reconnector(boolean join, InetSocketAddress prevAddr) {
super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
this.join = join;
+ this.prevAddr = prevAddr;
}
/**
@@ -1374,7 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
while (true) {
- T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
+ T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout);
if (joinRes == null) {
if (join) {
@@ -1609,6 +1624,10 @@ class ClientImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof SocketClosedMessage) {
if (((SocketClosedMessage)msg).sock == currSock) {
+ Socket sock = currSock.sock;
+
+ InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort());
+
currSock = null;
boolean join = joinLatch.getCount() > 0;
@@ -1637,8 +1656,7 @@ class ClientImpl extends TcpDiscoveryImpl {
assert reconnector == null;
- final Reconnector reconnector = new Reconnector(join);
- this.reconnector = reconnector;
+ reconnector = new Reconnector(join, prevAddr);
reconnector.start();
}
}
@@ -1811,7 +1829,7 @@ class ClientImpl extends TcpDiscoveryImpl {
T2<SocketStream, Boolean> joinRes;
try {
- joinRes = joinTopology(false, spi.joinTimeout);
+ joinRes = joinTopology(null, spi.joinTimeout);
}
catch (IgniteSpiException e) {
joinError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index efe531a..1c3ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -219,6 +219,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
+ /** Messages history used for client reconnect. */
+ private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
/** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
private boolean ipFinderHasLocAddr;
@@ -1663,8 +1666,23 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected IgniteSpiThread workerThread() {
- return msgWorker;
+ @Override protected Collection<IgniteSpiThread> threads() {
+ Collection<IgniteSpiThread> threads;
+
+ synchronized (mux) {
+ threads = new ArrayList<>(readers.size() + clientMsgWorkers.size() + 4);
+ threads.addAll(readers);
+ }
+
+ threads.addAll(clientMsgWorkers.values());
+ threads.add(tcpSrvr);
+ threads.add(ipFinderCleaner);
+ threads.add(msgWorker);
+ threads.add(statsPrinter);
+
+ threads.removeAll(Collections.<IgniteSpiThread>singleton(null));
+
+ return threads;
}
/**
@@ -2122,7 +2140,9 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
- msgs.add(msg);
+ synchronized (msgs) {
+ msgs.add(msg);
+ }
}
/**
@@ -2161,14 +2181,16 @@ class ServerImpl extends TcpDiscoveryImpl {
// Client connection failed before it received TcpDiscoveryNodeAddedMessage.
List<TcpDiscoveryAbstractMessage> res = null;
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
- res = new ArrayList<>(msgs.size());
- }
+ synchronized (msgs) {
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
+ res = new ArrayList<>(msgs.size());
+ }
- if (res != null)
- res.add(prepare(msg, node.id()));
+ if (res != null)
+ res.add(prepare(msg, node.id()));
+ }
}
if (log.isDebugEnabled()) {
@@ -2181,20 +2203,26 @@ class ServerImpl extends TcpDiscoveryImpl {
return res;
}
else {
- if (msgs.isEmpty())
- return Collections.emptyList();
+ Collection<TcpDiscoveryAbstractMessage> cp;
- Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+ boolean skip;
- boolean skip = true;
+ synchronized (msgs) {
+ if (msgs.isEmpty())
+ return Collections.emptyList();
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (skip) {
- if (msg.id().equals(lastMsgId))
- skip = false;
+ cp = new ArrayList<>(msgs.size());
+
+ skip = true;
+
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (skip) {
+ if (msg.id().equals(lastMsgId))
+ skip = false;
+ }
+ else
+ cp.add(prepare(msg, node.id()));
}
- else
- cp.add(prepare(msg, node.id()));
}
cp = !skip ? cp : null;
@@ -2483,9 +2511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending messages. */
private final PendingMessages pendingMsgs = new PendingMessages();
- /** Messages history used for client reconnect. */
- private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
-
/** Last message that updated topology. */
private TcpDiscoveryAbstractMessage lastMsg;
@@ -2659,8 +2684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
- else if (msg instanceof TcpDiscoveryClientReconnectMessage)
- processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ if (sendMessageToRemotes(msg))
+ sendMessageAcrossRing(msg);
+ }
else if (msg instanceof TcpDiscoveryNodeAddedMessage)
processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
@@ -2695,9 +2722,6 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (ensured && redirectToClients(msg))
- msgHist.add(msg);
-
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
// Received a message from remote node.
onMessageExchanged();
@@ -2730,6 +2754,9 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
if (redirectToClients(msg)) {
+ if (spi.ensured(msg))
+ msgHist.add(msg);
+
byte[] msgBytes = null;
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
@@ -3836,9 +3863,6 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
-
- if (nodeAddedMsg.verified())
- msgHist.add(nodeAddedMsg);
}
else if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
@@ -3941,98 +3965,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Processes client reconnect message.
- *
- * @param msg Client reconnect message.
- */
- private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
- UUID nodeId = msg.creatorNodeId();
-
- UUID locNodeId = getLocalNodeId();
-
- boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
-
- if (!msg.verified()) {
- TcpDiscoveryNode node = ring.node(nodeId);
-
- assert node == null || node.isClient();
-
- if (node != null) {
- node.clientRouterNodeId(msg.routerNodeId());
- node.clientAliveTime(spi.clientFailureDetectionTimeout());
- }
-
- if (isLocalNodeCoordinator()) {
- msg.verify(locNodeId);
-
- if (node != null) {
- Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
-
- if (pending != null) {
- msg.pendingMessages(pending);
- msg.success(true);
-
- if (log.isDebugEnabled())
- log.debug("Accept client reconnect, restored pending messages " +
- "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Failing reconnecting client node because failed to restore pending " +
- "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
- TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
- node.id(), node.internalOrder());
-
- processNodeFailedMessage(nodeFailedMsg);
-
- if (nodeFailedMsg.verified())
- msgHist.add(nodeFailedMsg);
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (sendMessageToRemotes(msg))
- sendMessageAcrossRing(msg);
- }
- }
- else {
- if (sendMessageToRemotes(msg))
- sendMessageAcrossRing(msg);
- }
- }
- else {
- if (isLocalNodeCoordinator())
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
- sendMessageAcrossRing(msg);
- }
- }
- }
-
- /**
* Processes node added message.
*
* For coordinator node method marks the messages as verified for rest of nodes to apply the
@@ -4078,9 +4010,6 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- if (addFinishMsg.verified())
- msgHist.add(addFinishMsg);
-
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
@@ -5145,9 +5074,6 @@ class ServerImpl extends TcpDiscoveryImpl {
locNodeId, clientNode.id(), clientNode.internalOrder());
processNodeFailedMessage(nodeFailedMsg);
-
- if (nodeFailedMsg.verified())
- msgHist.add(nodeFailedMsg);
}
}
}
@@ -5342,9 +5268,6 @@ class ServerImpl extends TcpDiscoveryImpl {
ackMsg.topologyVersion(msg.topologyVersion());
processCustomMessage(ackMsg);
-
- if (ackMsg.verified())
- msgHist.add(ackMsg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
@@ -5446,12 +5369,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (joiningEmpty && isLocalNodeCoordinator()) {
TcpDiscoveryCustomEventMessage msg;
- while ((msg = pollPendingCustomeMessage()) != null) {
+ while ((msg = pollPendingCustomeMessage()) != null)
processCustomMessage(msg);
-
- if (msg.verified())
- msgHist.add(msg);
- }
}
}
@@ -6005,24 +5924,22 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
- if (clientMsgWrk != null) {
- TcpDiscoverySpiState state = spiStateCopy();
+ TcpDiscoverySpiState state = spiStateCopy();
- if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ if (state == CONNECTED) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
- if (clientMsgWrk.getState() == State.NEW)
- clientMsgWrk.start();
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
- msgWorker.addMessage(msg);
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
- continue;
- }
- else {
- spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+ continue;
+ }
+ else {
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
- break;
- }
+ break;
}
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
@@ -6266,6 +6183,100 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Processes client reconnect message.
+ *
+ * @param msg Client reconnect message.
+ */
+ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ UUID nodeId = msg.creatorNodeId();
+
+ UUID locNodeId = getLocalNodeId();
+
+ boolean isLocNodeRouter = msg.routerNodeId().equals(locNodeId);
+
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ assert node == null || node.isClient();
+
+ if (node != null) {
+ node.clientRouterNodeId(msg.routerNodeId());
+ node.clientAliveTime(spi.clientFailureDetectionTimeout());
+ }
+
+ if (!msg.verified()) {
+ if (isLocNodeRouter || isLocalNodeCoordinator()) {
+ if (node != null) {
+ Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+
+ if (pending != null) {
+ msg.verify(locNodeId);
+ msg.pendingMessages(pending);
+ msg.success(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Accept client reconnect, restored pending messages " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else if (!isLocalNodeCoordinator()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to restore pending messages for reconnecting client. " +
+ "Forwarding reconnection message to coordinator " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ msg.verify(locNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Failing reconnecting client node because failed to restore pending " +
+ "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+
+ TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+ node.id(), node.internalOrder());
+
+ msgWorker.addMessage(nodeFailedMsg);
+ }
+ }
+ else {
+ msg.verify(locNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
+ }
+
+ if (msg.verified() && isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else
+ msgWorker.addMessage(msg);
+ }
+ else
+ msgWorker.addMessage(msg);
+ }
+ else {
+ if (isLocalNodeCoordinator())
+ msgWorker.addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
+
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
+ msgWorker.addMessage(msg);
+ }
+ }
+
+ /**
* Processes client metrics update message.
*
* @param msg Client metrics update message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b31e2e4..f3cf48d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -299,9 +299,9 @@ abstract class TcpDiscoveryImpl {
/**
* <strong>FOR TEST ONLY!!!</strong>
*
- * @return Worker thread.
+ * @return Worker threads.
*/
- protected abstract IgniteSpiThread workerThread();
+ protected abstract Collection<IgniteSpiThread> threads();
/**
* @throws IgniteSpiException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 689ac72..f1c826a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -56,15 +56,9 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** */
private final static long FAILURE_THRESHOLD = 10_000;
- /** */
- private final static long CLIENT_FAILURE_THRESHOLD = 30_000;
-
/** Failure detection timeout for nodes configuration. */
private static long failureThreshold = FAILURE_THRESHOLD;
- /** Client failure detection timeout for nodes configuration. */
- private static long clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
-
/** */
private static boolean useTestSpi;
@@ -75,7 +69,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** {@inheritDoc} */
@Override protected long clientFailureDetectionTimeout() {
- return clientFailureThreshold;
+ return clientFailureDetectionTimeout;
}
/** {@inheritDoc} */
@@ -153,7 +147,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
*/
public void testFailureTimeoutServerClient() throws Exception {
failureThreshold = 3000;
- clientFailureThreshold = 2000;
+ clientFailureDetectionTimeout = 2000;
try {
startServerNodes(1);
@@ -190,13 +184,12 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
long detectTime = failureDetectTime[0] - failureTime;
assertTrue("Client node failure detected too fast: " + detectTime + "ms",
- detectTime > clientFailureThreshold - 200);
+ detectTime > clientFailureDetectionTimeout - 200);
assertTrue("Client node failure detected too slow: " + detectTime + "ms",
- detectTime < clientFailureThreshold + 5000);
+ detectTime < clientFailureDetectionTimeout + 5000);
}
finally {
failureThreshold = FAILURE_THRESHOLD;
- clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
}
}
@@ -207,7 +200,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
*/
public void testFailureTimeout3Server() throws Exception {
failureThreshold = 1000;
- clientFailureThreshold = 10000;
+ clientFailureDetectionTimeout = 10000;
useTestSpi = true;
try {
@@ -254,11 +247,10 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
assertTrue("Server node failure detected too fast: " + detectTime + "ms",
detectTime > failureThreshold - 100);
assertTrue("Server node failure detected too slow: " + detectTime + "ms",
- detectTime < clientFailureThreshold);
+ detectTime < clientFailureDetectionTimeout);
}
finally {
failureThreshold = FAILURE_THRESHOLD;
- clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
useTestSpi = false;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 329783e..ee88b0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -62,8 +62,8 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -73,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -87,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
*/
public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
protected static final AtomicInteger srvIdx = new AtomicInteger();
@@ -123,6 +124,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private static CountDownLatch clientFailedLatch;
/** */
+ private static CountDownLatch clientReconnectedLatch;
+
+ /** */
private static CountDownLatch msgLatch;
/** */
@@ -138,10 +142,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
/** */
+ protected Integer reconnectCnt;
+
+ /** */
private boolean longSockTimeouts;
/** */
- private long clientFailureDetectionTimeout = 1000;
+ protected long clientFailureDetectionTimeout = 1000;
/** */
private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
@@ -207,6 +214,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ if (reconnectCnt != null)
+ disco.setReconnectCount(reconnectCnt);
+
disco.setClientReconnectDisabled(reconnectDisabled);
if (disco instanceof TestTcpDiscoverySpi)
@@ -253,6 +263,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientIpFinder = null;
joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+ clientFailureDetectionTimeout = 1000;
longSockTimeouts = false;
assert G.allGrids().isEmpty();
@@ -558,6 +569,221 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * Client should reconnect to available server without EVT_CLIENT_NODE_RECONNECTED event.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterSuspend() throws Exception {
+ reconnectAfterSuspend(false);
+ }
+
+ /**
+ * Client should receive all topology updates after reconnect.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterSuspendTopologyChange() throws Exception {
+ reconnectAfterSuspend(true);
+ }
+
+ /**
+ * @param changeTop If {@code true} topology is changed after client disconnects
+ * @throws Exception if failed.
+ */
+ private void reconnectAfterSuspend(boolean changeTop) throws Exception {
+ reconnectCnt = 2;
+
+ startServerNodes(2);
+
+ Ignite srv0 = grid("server-0");
+ TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+ TcpDiscoveryNode srv1Node = (TcpDiscoveryNode)grid("server-1").cluster().localNode();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+ UUID clientNodeId = clientNode.id();
+
+ checkNodes(2, 1);
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:" + srv1Node.discoveryPort()));
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(2, 1);
+
+ log.info("Pausing router");
+
+ TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv0.configuration().getDiscoverySpi();
+
+ int joinedNodesNum = 3;
+ final CountDownLatch srvJoinedLatch = new CountDownLatch(joinedNodesNum);
+
+ if (changeTop) {
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+
+ srvSpi.pauseAll(true);
+
+ if (changeTop)
+ startServerNodes(joinedNodesNum);
+
+ try {
+ await(srvFailedLatch, 60_000);
+
+ if (changeTop)
+ await(srvJoinedLatch, 5000);
+
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ assertEquals(srv1Node.id(), clientNode.clientRouterNodeId());
+ }
+ finally {
+ srvSpi.resumeAll();
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testClientReconnectHistoryMissingOnRouter() throws Exception {
+ clientFailureDetectionTimeout = 60000;
+ netTimeout = 60000;
+
+ startServerNodes(2);
+
+ Ignite srv0 = grid("server-0");
+ TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+ startClientNodes(1);
+
+ attachListeners(0, 1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+ UUID clientNodeId = clientNode.id();
+
+ checkNodes(2, 1);
+
+ clientSpi.pauseAll(true);
+
+ stopGrid(srv0.name());
+
+ startServerNodes(1);
+
+ Ignite srv2 = grid("server-2");
+ TcpDiscoveryNode srv2Node = (TcpDiscoveryNode)srv2.cluster().localNode();
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv2Node.discoveryPort()));
+
+ clientSpi.resumeAll();
+
+ awaitPartitionMapExchange();
+
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ assertEquals(srv2Node.id(), clientNode.clientRouterNodeId());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterPause() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+ clientReconnectedLatch = new CountDownLatch(1);
+
+ attachListeners(0, 1);
+
+ clientSpi.pauseAll(false);
+
+ try {
+ clientSpi.brakeConnection();
+
+ Thread.sleep(clientFailureDetectionTimeout() * 2);
+ }
+ finally {
+ clientSpi.resumeAll();
+ }
+
+ await(clientReconnectedLatch);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testReconnectAfterMassiveTopologyChange() throws Exception {
+ clientIpFinder = IP_FINDER;
+
+ clientFailureDetectionTimeout = 60000;
+ netTimeout = 60000;
+
+ int initSrvsNum = 5;
+ int killNum = 3;
+ int iterations = 10;
+
+ startServerNodes(initSrvsNum);
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+ final UUID clientNodeId = clientNode.id();
+
+ final CountDownLatch srvJoinedLatch = new CountDownLatch(iterations * killNum);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+
+ int minAliveSrvId = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ for (int j = 0; j < killNum; j++) {
+ stopGrid(minAliveSrvId);
+
+ minAliveSrvId++;
+ }
+
+ startServerNodes(killNum);
+
+ awaitPartitionMapExchange();
+ }
+
+ await(srvJoinedLatch);
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testClientReconnectOnNetworkProblem() throws Exception {
@@ -1410,17 +1636,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
srvSpi.failNode(client.cluster().localNode().id(), null);
- if (changeTop) {
- Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(failLatch.await(5000, MILLISECONDS));
- srvNodeIds.add(g.cluster().localNode().id());
+ if (changeTop) {
+ startServerNodes(1);
clientSpi.resumeAll();
}
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
assertTrue(reconnectLatch.await(5000, MILLISECONDS));
- assertTrue(failLatch.await(5000, MILLISECONDS));
assertTrue(joinLatch.await(5000, MILLISECONDS));
long topVer = changeTop ? 5L : 4L;
@@ -2026,6 +2251,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}, EVT_NODE_FAILED);
}
}
+
+ if (clientReconnectedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Reconnected event fired on client: " + evt);
+
+ clientReconnectedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+ }
+ }
}
/**
@@ -2095,7 +2334,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws InterruptedException If interrupted.
*/
protected void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+ await(latch, awaitTime());
+ }
+
+ /**
+ * @param latch Latch.
+ * @param timeout Timeout.
+ * @throws InterruptedException If interrupted.
+ */
+ protected void await(CountDownLatch latch, long timeout) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(timeout, MILLISECONDS));
}
/**
@@ -2324,8 +2572,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void pauseAll(boolean suspend) {
pauseResumeOperation(true, openSockLock, writeLock);
- if (suspend)
- impl.workerThread().suspend();
+ if (suspend) {
+ for (Thread t : impl.threads())
+ t.suspend();
+ }
}
/**
@@ -2334,7 +2584,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void resumeAll() {
pauseResumeOperation(false, openSockLock, writeLock);
- impl.workerThread().resume();
+ for (IgniteSpiThread t : impl.threads())
+ t.resume();
}
/** {@inheritDoc} */
[2/5] ignite git commit: IGNITE-6768 .NET: Thin client: Fix cache id
calculation
Posted by sb...@apache.org.
IGNITE-6768 .NET: Thin client: Fix cache id calculation
Do not force lower case
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/717c5492
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/717c5492
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/717c5492
Branch: refs/heads/ignite-3478
Commit: 717c549248eb377dd0dc7b28c8707d2e496c5a4e
Parents: e228ce3
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Oct 27 12:56:37 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Oct 27 12:56:37 2017 +0300
----------------------------------------------------------------------
.../Binary/BinaryBuilderSelfTest.cs | 10 ++---
.../Binary/EnumsTest.cs | 2 +-
.../Client/Cache/CacheTest.cs | 42 ++++++++++++++++++++
.../Client/RawSocketTest.cs | 2 +-
.../Impl/Binary/BinaryUtils.cs | 30 +++++++++++---
.../Impl/Binary/Marshaller.cs | 4 +-
.../Impl/Binary/SerializableSerializer.cs | 18 ++++-----
7 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
index 61f90a3..5837ab1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/BinaryBuilderSelfTest.cs
@@ -1600,7 +1600,7 @@ namespace Apache.Ignite.Core.Tests.Binary
Assert.AreEqual(IdMapper.TestTypeId, _grid.GetBinary().GetTypeId(IdMapper.TestTypeName));
- Assert.AreEqual(BinaryUtils.GetStringHashCode("someTypeName"), _grid.GetBinary().GetTypeId("someTypeName"));
+ Assert.AreEqual(BinaryUtils.GetStringHashCodeLowerCase("someTypeName"), _grid.GetBinary().GetTypeId("someTypeName"));
}
/// <summary>
@@ -1615,7 +1615,7 @@ namespace Apache.Ignite.Core.Tests.Binary
var binType = bin.GetBinaryType();
- Assert.AreEqual(BinaryUtils.GetStringHashCode(NameMapper.TestTypeName + "_"), binType.TypeId);
+ Assert.AreEqual(BinaryUtils.GetStringHashCodeLowerCase(NameMapper.TestTypeName + "_"), binType.TypeId);
Assert.AreEqual(17, bin.GetField<int>(NameMapper.TestFieldName));
}
@@ -1666,7 +1666,7 @@ namespace Apache.Ignite.Core.Tests.Binary
var enumVal = TestEnumRegistered.Two;
var intVal = (int) enumVal;
var typeName = GetTypeName(typeof(TestEnumRegistered));
- var typeId = BinaryUtils.GetStringHashCode(typeName);
+ var typeId = BinaryUtils.GetStringHashCodeLowerCase(typeName);
var binEnums = new[]
{
@@ -2170,13 +2170,13 @@ namespace Apache.Ignite.Core.Tests.Binary
/** <inheritdoc /> */
public int GetTypeId(string typeName)
{
- return typeName == TestTypeName ? TestTypeId : BinaryUtils.GetStringHashCode(typeName);
+ return typeName == TestTypeName ? TestTypeId : BinaryUtils.GetStringHashCodeLowerCase(typeName);
}
/** <inheritdoc /> */
public int GetFieldId(int typeId, string fieldName)
{
- return BinaryUtils.GetStringHashCode(fieldName);
+ return BinaryUtils.GetStringHashCodeLowerCase(fieldName);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
index 18ef29a..8993fb4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Binary/EnumsTest.cs
@@ -88,7 +88,7 @@ namespace Apache.Ignite.Core.Tests.Binary
else
{
Assert.AreEqual(string.Format("BinaryEnum [typeId={0}, enumValue={1}]",
- BinaryUtils.GetStringHashCode(typeof(T).FullName), binRes.EnumValue), binRes.ToString());
+ BinaryUtils.GetStringHashCodeLowerCase(typeof(T).FullName), binRes.EnumValue), binRes.ToString());
}
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
index f2dd1de..cfdce73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTest.cs
@@ -845,6 +845,48 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
}
}
+ /// <summary>
+ /// Tests various cache names.
+ /// Cache id as calculated as a hash code and passed to the server side; this test verifies correct id
+ /// calculation for different strings.
+ /// </summary>
+ [Test]
+ public void TestCacheNames()
+ {
+ var cacheNames = new[]
+ {
+ "foo-bar",
+ "Foo-Bar",
+ "FOO-BAR",
+ "testCache1",
+ "TestCache2",
+ "TESTCACHE3",
+ new string('c', 100),
+ new string('C', 100),
+ Guid.NewGuid().ToString(),
+ "тест",
+ "Тест",
+ "ТЕСТ",
+ "тест1",
+ "Тест2",
+ "ТЕСТ3"
+ };
+
+ var ignite = Ignition.GetIgnite();
+
+ for (var i = 0; i < cacheNames.Length; i++)
+ {
+ var cacheName = cacheNames[i];
+ ignite.CreateCache<int, string>(cacheName).Put(i, cacheName);
+
+ using (var client = GetClient())
+ {
+ var cache = client.GetCache<int, string>(cacheName);
+ Assert.AreEqual(cacheName, cache[i]);
+ }
+ }
+ }
+
private class Container
{
public Container Inner;
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
index b637e88..0f1358a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/RawSocketTest.cs
@@ -54,7 +54,7 @@ namespace Apache.Ignite.Core.Tests.Client
{
stream.WriteShort(1); // OP_GET
stream.WriteLong(1); // Request id.
- var cacheId = BinaryUtils.GetStringHashCode(cache.Name);
+ var cacheId = BinaryUtils.GetStringHashCodeLowerCase(cache.Name);
stream.WriteInt(cacheId);
stream.WriteByte(0); // Flags (withSkipStore, etc)
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
index 5233db8..20fea02 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
@@ -23,12 +23,10 @@ namespace Apache.Ignite.Core.Impl.Binary
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
- using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text;
using Apache.Ignite.Core.Binary;
- using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
@@ -1037,6 +1035,8 @@ namespace Apache.Ignite.Core.Impl.Binary
{
var elemType = val.GetType().GetElementType();
+ Debug.Assert(elemType != null);
+
var typeId = ObjTypeId;
if (elemType != typeof(object))
@@ -1333,9 +1333,9 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
- /// Gets the string hash code using Java algorithm.
+ /// Gets the string hash code using Java algorithm, converting English letters to lower case.
/// </summary>
- public static int GetStringHashCode(string val)
+ public static int GetStringHashCodeLowerCase(string val)
{
if (val == null)
return 0;
@@ -1353,6 +1353,26 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
+ /// Gets the string hash code using Java algorithm.
+ /// </summary>
+ private static int GetStringHashCode(string val)
+ {
+ if (val == null)
+ return 0;
+
+ int hash = 0;
+
+ unchecked
+ {
+ // ReSharper disable once LoopCanBeConvertedToQuery (performance)
+ foreach (var c in val)
+ hash = 31 * hash + c;
+ }
+
+ return hash;
+ }
+
+ /// <summary>
/// Gets the cache identifier.
/// </summary>
public static int GetCacheId(string cacheName)
@@ -1447,7 +1467,7 @@ namespace Apache.Ignite.Core.Impl.Binary
}
if (id == 0)
- id = GetStringHashCode(fieldName);
+ id = GetStringHashCodeLowerCase(fieldName);
if (id == 0)
throw new BinaryObjectException("Field ID is zero (please provide ID mapper or change field name) " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 55b6121..7212cd6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -694,7 +694,7 @@ namespace Apache.Ignite.Core.Impl.Binary
if (typeId == 0)
{
- typeId = BinaryUtils.GetStringHashCode(typeName);
+ typeId = BinaryUtils.GetStringHashCodeLowerCase(typeName);
}
AddType(type, typeId, typeName, false, false, null, null, serializer, affKeyFldName, false);
@@ -826,7 +826,7 @@ namespace Apache.Ignite.Core.Impl.Binary
if (id == 0)
{
- id = BinaryUtils.GetStringHashCode(typeName);
+ id = BinaryUtils.GetStringHashCodeLowerCase(typeName);
}
return id;
http://git-wip-us.apache.org/repos/asf/ignite/blob/717c5492/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
index 80f267a..fc91edb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/SerializableSerializer.cs
@@ -174,7 +174,7 @@ namespace Apache.Ignite.Core.Impl.Binary
foreach (var dotNetField in dotNetFields)
{
- writer.WriteInt(BinaryUtils.GetStringHashCode(dotNetField));
+ writer.WriteInt(BinaryUtils.GetStringHashCodeLowerCase(dotNetField));
}
}
@@ -617,49 +617,49 @@ namespace Apache.Ignite.Core.Impl.Binary
{
if (fieldType == typeof(byte))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? (sbyte) (byte) fieldVal : fieldVal;
}
if (fieldType == typeof(short))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? (ushort) (short) fieldVal : fieldVal;
}
if (fieldType == typeof(int))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? (uint) (int) fieldVal : fieldVal;
}
if (fieldType == typeof(long))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? (ulong) (long) fieldVal : fieldVal;
}
if (fieldType == typeof(byte[]))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? ConvertArray<byte, sbyte>((byte[]) fieldVal) : fieldVal;
}
if (fieldType == typeof(short[]))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? ConvertArray<short, ushort>((short[]) fieldVal) : fieldVal;
}
if (fieldType == typeof(int[]))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? ConvertArray<int, uint>((int[]) fieldVal) : fieldVal;
}
if (fieldType == typeof(long[]))
{
- return dotNetFields.Contains(BinaryUtils.GetStringHashCode(fieldName))
+ return dotNetFields.Contains(BinaryUtils.GetStringHashCodeLowerCase(fieldName))
? ConvertArray<long, ulong>((long[]) fieldVal) : fieldVal;
}
}
[4/5] ignite git commit: .NET: Fix TestAsciiChars
Posted by sb...@apache.org.
.NET: Fix TestAsciiChars
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/031f63c2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/031f63c2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/031f63c2
Branch: refs/heads/ignite-3478
Commit: 031f63c2e295bad90428e6d8e0edb7d7fafd466b
Parents: 56a63f8
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Oct 27 14:29:30 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Oct 27 14:29:30 2017 +0300
----------------------------------------------------------------------
.../dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/031f63c2/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
index 0c0d0f9..dff7ed7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
@@ -123,7 +123,13 @@ namespace Apache.Ignite.Core.Tests
[Test]
public void TestAsciiChars()
{
- var allowedFiles = new[] {"BinaryStringTest.cs", "BinarySelfTest.cs", "CacheDmlQueriesTest.cs"};
+ var allowedFiles = new[]
+ {
+ "BinaryStringTest.cs",
+ "BinarySelfTest.cs",
+ "CacheDmlQueriesTest.cs",
+ "CacheTest.cs"
+ };
var srcFiles = GetDotNetSourceDir()
.GetFiles("*.cs", SearchOption.AllDirectories)
[5/5] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3478
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6466adf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6466adf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6466adf5
Branch: refs/heads/ignite-3478
Commit: 6466adf54f017219a16cf9835ee0214853db269e
Parents: d46a039 031f63c
Author: sboikov <sb...@gridgain.com>
Authored: Fri Oct 27 15:14:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 15:14:39 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 6 +
.../org/apache/ignite/internal/IgnitionEx.java | 34 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 311 ++++++++++---------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 20 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 275 +++++++++++++++-
.../Binary/BinaryBuilderSelfTest.cs | 10 +-
.../Binary/EnumsTest.cs | 2 +-
.../Client/Cache/CacheTest.cs | 42 +++
.../Client/RawSocketTest.cs | 2 +-
.../ProjectFilesTest.cs | 8 +-
.../Impl/Binary/BinaryUtils.cs | 30 +-
.../Impl/Binary/Marshaller.cs | 4 +-
.../Impl/Binary/SerializableSerializer.cs | 18 +-
15 files changed, 597 insertions(+), 221 deletions(-)
----------------------------------------------------------------------