You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:04:13 UTC
[50/50] [abbrv] ignite git commit: ignite-3054
ignite-3054
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2aa60b38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2aa60b38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2aa60b38
Branch: refs/heads/ignite-3054
Commit: 2aa60b38b73e648ea74c7eb6b6e2c671951f2a3e
Parents: 2397511
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 24 17:57:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 24 17:58:57 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 48 +++-----------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 16 +------
.../messages/TcpDiscoveryAbstractMessage.java | 5 +-
.../messages/TcpDiscoveryHandshakeResponse.java | 14 ------
4 files changed, 9 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index eeba6f0..3ee0b60 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -660,10 +660,7 @@ class ClientImpl extends TcpDiscoveryImpl {
", rmtNodeId=" + rmtNodeId + ']');
return new JoinResult(new SocketStream(sock),
- spi.readReceipt(sock,
- timeoutHelper.nextTimeoutChunk(ackTimeout0)),
- res.clientAck(),
- true);
+ spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)));
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
@@ -1077,9 +1074,6 @@ class ClientImpl extends TcpDiscoveryImpl {
private Socket sock;
/** */
- private boolean clientAck;
-
- /** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/** */
@@ -1089,9 +1083,6 @@ class ClientImpl extends TcpDiscoveryImpl {
private TcpDiscoveryAbstractMessage unackedMsg;
/** */
- private boolean writeLen;
-
- /** */
private volatile long lastMsgReceived;
/** */
@@ -1143,17 +1134,11 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param sock Socket.
- * @param clientAck {@code True} is server supports client message acknowlede.
- * @param writeLen Add message length if {@code true}.
*/
- private void setSocket(Socket sock, boolean clientAck, boolean writeLen) {
+ private void setSocket(Socket sock) {
synchronized (mux) {
this.sock = sock;
- this.clientAck = clientAck;
-
- this.writeLen = writeLen;
-
unackedMsg = null;
mux.notifyAll();
@@ -1248,7 +1233,7 @@ class ClientImpl extends TcpDiscoveryImpl {
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
msgLsnr.apply(msg);
- boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
+ boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
try {
if (ack) {
@@ -1259,10 +1244,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- if (!writeLen)
- spi.writeToSocket(sock, msg, sockTimeout);
- else
- writeToSocketWithLength(msg, sock, sockTimeout);
+ writeToSocketWithLength(msg, sock, sockTimeout);
msg = null;
@@ -1350,9 +1332,6 @@ class ClientImpl extends TcpDiscoveryImpl {
private volatile SocketStream sockStream;
/** */
- private boolean clientAck;
-
- /** */
private boolean join;
/**
@@ -1407,7 +1386,6 @@ class ClientImpl extends TcpDiscoveryImpl {
}
sockStream = joinRes.sockStream;
- clientAck = joinRes.clientAck;
Socket sock = sockStream.socket();
@@ -1847,7 +1825,7 @@ class ClientImpl extends TcpDiscoveryImpl {
currSock = joinRes.sockStream;
- sockWriter.setSocket(joinRes.sockStream.socket(), joinRes.clientAck, joinRes.asyncMode);
+ sockWriter.setSocket(joinRes.sockStream.socket());
if (spi.joinTimeout > 0) {
final int joinCnt0 = joinCnt;
@@ -2247,7 +2225,7 @@ class ClientImpl extends TcpDiscoveryImpl {
currSock = reconnector.sockStream;
- sockWriter.setSocket(currSock.socket(), reconnector.clientAck, true);
+ sockWriter.setSocket(currSock.socket());
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
reconnector = null;
@@ -2501,25 +2479,13 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private final int receipt;
- /** */
- private final boolean clientAck;
-
- /** Marks if client must add length to each message. */
- private final boolean asyncMode;
-
/**
* @param sockStream Socket stream.
* @param receipt Receipt.
- * @param clientAck Client acknowledge.
- * @param asyncMode Clietn async mode.
*/
- private JoinResult(final SocketStream sockStream, final int receipt,
- final boolean clientAck,
- final boolean asyncMode) {
+ private JoinResult(final SocketStream sockStream, final int receipt) {
this.sockStream = sockStream;
this.receipt = receipt;
- this.clientAck = clientAck;
- this.asyncMode = asyncMode;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5fa58ad..d7d377d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -30,14 +30,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
-import java.net.SocketOption;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AlreadyBoundException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ServerSocketChannel;
@@ -102,7 +99,6 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
@@ -199,10 +195,6 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int NIO_WORKER_META = GridNioSessionMetaKey.nextUniqueKey();
/** */
- private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
- IgniteProductVersion.fromString("1.5.0");
-
- /** */
private IgniteThreadPoolExecutor utilityPool;
/** */
@@ -6449,13 +6441,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
- boolean asyncMode = false;
-
- if (req.client()) {
- res.clientAck(true);
-
- asyncMode = true;
- }
+ boolean asyncMode = req.client();
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index e982b2f..9d64868 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -46,10 +46,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
/** */
- protected static final int CLIENT_ACK_FLAG_POS = 4;
-
- /** */
- protected static final int FORCE_FAIL_FLAG_POS = 8;
+ protected static final int FORCE_FAIL_FLAG_POS = 3;
/** Sender of the message (transient). */
private transient UUID sndNodeId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 0d350af..1c09252 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -60,20 +60,6 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
this.order = order;
}
- /**
- * @return {@code True} if server supports client message acknowledge.
- */
- public boolean clientAck() {
- return getFlag(CLIENT_ACK_FLAG_POS);
- }
-
- /**
- * @param clientAck {@code True} if server supports client message acknowledge.
- */
- public void clientAck(boolean clientAck) {
- setFlag(CLIENT_ACK_FLAG_POS, clientAck);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());