You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:04:13 UTC

[50/50] [abbrv] ignite git commit: ignite-3054

ignite-3054


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2aa60b38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2aa60b38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2aa60b38

Branch: refs/heads/ignite-3054
Commit: 2aa60b38b73e648ea74c7eb6b6e2c671951f2a3e
Parents: 2397511
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 24 17:57:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 24 17:58:57 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 48 +++-----------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 16 +------
 .../messages/TcpDiscoveryAbstractMessage.java   |  5 +-
 .../messages/TcpDiscoveryHandshakeResponse.java | 14 ------
 4 files changed, 9 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index eeba6f0..3ee0b60 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -660,10 +660,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         ", rmtNodeId=" + rmtNodeId + ']');
 
                 return new JoinResult(new SocketStream(sock),
-                    spi.readReceipt(sock,
-                    timeoutHelper.nextTimeoutChunk(ackTimeout0)),
-                    res.clientAck(),
-                    true);
+                    spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)));
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -1077,9 +1074,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         private Socket sock;
 
         /** */
-        private boolean clientAck;
-
-        /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
@@ -1089,9 +1083,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         private TcpDiscoveryAbstractMessage unackedMsg;
 
         /** */
-        private boolean writeLen;
-
-        /** */
         private volatile long lastMsgReceived;
 
         /** */
@@ -1143,17 +1134,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /**
          * @param sock Socket.
-         * @param clientAck {@code True} is server supports client message acknowlede.
-         * @param writeLen Add message length if {@code true}.
          */
-        private void setSocket(Socket sock, boolean clientAck, boolean writeLen) {
+        private void setSocket(Socket sock) {
             synchronized (mux) {
                 this.sock = sock;
 
-                this.clientAck = clientAck;
-
-                this.writeLen = writeLen;
-
                 unackedMsg = null;
 
                 mux.notifyAll();
@@ -1248,7 +1233,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
                     msgLsnr.apply(msg);
 
-                boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
+                boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
 
                 try {
                     if (ack) {
@@ -1259,10 +1244,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    if (!writeLen)
-                        spi.writeToSocket(sock, msg, sockTimeout);
-                    else
-                        writeToSocketWithLength(msg, sock, sockTimeout);
+                    writeToSocketWithLength(msg, sock, sockTimeout);
 
                     msg = null;
 
@@ -1350,9 +1332,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         private volatile SocketStream sockStream;
 
         /** */
-        private boolean clientAck;
-
-        /** */
         private boolean join;
 
         /**
@@ -1407,7 +1386,6 @@ class ClientImpl extends TcpDiscoveryImpl {
                     }
 
                     sockStream = joinRes.sockStream;
-                    clientAck = joinRes.clientAck;
 
                     Socket sock = sockStream.socket();
 
@@ -1847,7 +1825,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             currSock = joinRes.sockStream;
 
-            sockWriter.setSocket(joinRes.sockStream.socket(), joinRes.clientAck, joinRes.asyncMode);
+            sockWriter.setSocket(joinRes.sockStream.socket());
 
             if (spi.joinTimeout > 0) {
                 final int joinCnt0 = joinCnt;
@@ -2247,7 +2225,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     currSock = reconnector.sockStream;
 
-                    sockWriter.setSocket(currSock.socket(), reconnector.clientAck, true);
+                    sockWriter.setSocket(currSock.socket());
                     sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 
                     reconnector = null;
@@ -2501,25 +2479,13 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private final int receipt;
 
-        /** */
-        private final boolean clientAck;
-
-        /** Marks if client must add length to each message. */
-        private final boolean asyncMode;
-
         /**
          * @param sockStream Socket stream.
          * @param receipt Receipt.
-         * @param clientAck Client acknowledge.
-         * @param asyncMode Clietn async mode.
          */
-        private JoinResult(final SocketStream sockStream, final int receipt,
-            final boolean clientAck,
-            final boolean asyncMode) {
+        private JoinResult(final SocketStream sockStream, final int receipt) {
             this.sockStream = sockStream;
             this.receipt = receipt;
-            this.clientAck = clientAck;
-            this.asyncMode = asyncMode;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5fa58ad..d7d377d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -30,14 +30,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketException;
-import java.net.SocketOption;
 import java.net.SocketTimeoutException;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.AlreadyBoundException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.Channel;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ServerSocketChannel;
@@ -102,7 +99,6 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -199,10 +195,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     private static final int NIO_WORKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
-    private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
-        IgniteProductVersion.fromString("1.5.0");
-
-    /** */
     private IgniteThreadPoolExecutor utilityPool;
 
     /** */
@@ -6449,13 +6441,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 
-                    boolean asyncMode = false;
-
-                    if (req.client()) {
-                        res.clientAck(true);
-
-                        asyncMode = true;
-                    }
+                    boolean asyncMode = req.client();
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
                     // the local node sends a handshake request message on the loopback address, so we get here.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index e982b2f..9d64868 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -46,10 +46,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
 
     /** */
-    protected static final int CLIENT_ACK_FLAG_POS = 4;
-
-    /** */
-    protected static final int FORCE_FAIL_FLAG_POS = 8;
+    protected static final int FORCE_FAIL_FLAG_POS = 3;
 
     /** Sender of the message (transient). */
     private transient UUID sndNodeId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2aa60b38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 0d350af..1c09252 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -60,20 +60,6 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
         this.order = order;
     }
 
-    /**
-     * @return {@code True} if server supports client message acknowledge.
-     */
-    public boolean clientAck() {
-        return getFlag(CLIENT_ACK_FLAG_POS);
-    }
-
-    /**
-     * @param clientAck {@code True} if server supports client message acknowledge.
-     */
-    public void clientAck(boolean clientAck) {
-        setFlag(CLIENT_ACK_FLAG_POS, clientAck);
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());