You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/03/16 17:18:14 UTC

[geode] branch feature/GEODE-7877 updated: GEODE-7877: deal with the static Version map in TcpClient

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7877
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-7877 by this push:
     new 729872c  GEODE-7877: deal with the static Version map in TcpClient
729872c is described below

commit 729872cc00585f3d2a2a231b4a635af455cf30fb
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Mar 16 10:14:08 2020 -0700

    GEODE-7877: deal with the static Version map in TcpClient
    
    I've removed the old ServerVersion messaging, which is no longer required.
    Locators will dumb-down their messages to old clients based on the
    Version they send.  Clients don't need to know what Version Locators are
    running because newer version clients aren't allowed to communicate with
    older-Version locators.
---
 .../geode/test/dunit/rules/DistributedRule.java    |   2 -
 .../distributed/internal/tcpserver/TcpClient.java  | 131 +--------------------
 .../distributed/internal/tcpserver/TcpServer.java  | 107 ++++++-----------
 .../internal/tcpserver/TcpServerJUnitTest.java     |   6 -
 4 files changed, 35 insertions(+), 211 deletions(-)

diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedRule.java
index 7799697..1943fcd 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedRule.java
@@ -28,7 +28,6 @@ import org.apache.geode.cache30.ClientServerTestCase;
 import org.apache.geode.cache30.RegionTestCase;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.tcpserver.TcpClient;
 import org.apache.geode.internal.admin.ClientStatsManager;
 import org.apache.geode.internal.cache.DiskStoreObserver;
 import org.apache.geode.internal.cache.InitialImageOperation;
@@ -243,7 +242,6 @@ public class DistributedRule extends AbstractDistributedRule {
       RegionTestCase.preSnapshotRegion = null;
       SocketCreator.resetHostNameCache();
       SocketCreator.resolve_dns = true;
-      TcpClient.clearStaticData();
 
       // clear system properties -- keep alphabetized
       System.clearProperty(GeodeGlossary.GEMFIRE_PREFIX + "log-level");
diff --git a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
index f57b584..11563af 100644
--- a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
+++ b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
@@ -19,26 +19,17 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
-import javax.net.ssl.SSLException;
-
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.serialization.ObjectDeserializer;
 import org.apache.geode.internal.serialization.ObjectSerializer;
-import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
-import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
@@ -54,10 +45,6 @@ public class TcpClient {
 
   private static final int DEFAULT_REQUEST_TIMEOUT = 60 * 2 * 1000;
 
-  @MakeNotStatic
-  private static final Map<HostAndPort, Short> serverVersions =
-      new HashMap<>();
-
   private final TcpSocketCreator socketCreator;
   private final ObjectSerializer objectSerializer;
   private final ObjectDeserializer objectDeserializer;
@@ -147,20 +134,9 @@ public class TcpClient {
       boolean replyExpected) throws IOException, ClassNotFoundException {
     long giveupTime = System.currentTimeMillis() + timeout;
 
-    // Get the GemFire version of the TcpServer first, before sending any other request.
-    short serverVersion = getServerVersion(addr, timeout);
-
-    if (serverVersion > Version.CURRENT_ORDINAL) {
-      serverVersion = Version.CURRENT_ORDINAL;
-    }
-
     // establish the old GossipVersion for the server
     int gossipVersion = TcpServer.getCurrentGossipVersion();
 
-    if (Version.GFE_71.compareTo(serverVersion) > 0) {
-      gossipVersion = TcpServer.getOldGossipVersion();
-    }
-
     long newTimeout = giveupTime - System.currentTimeMillis();
     if (newTimeout <= 0) {
       return null;
@@ -175,22 +151,15 @@ public class TcpClient {
     try {
 
       out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
-
-      if (serverVersion < Version.CURRENT_ORDINAL) {
-        out = new VersionedDataOutputStream(out, Version.fromOrdinalNoThrow(serverVersion, false));
-      }
-
       out.writeInt(gossipVersion);
-      if (gossipVersion > TcpServer.getOldGossipVersion()) {
-        out.writeShort(serverVersion);
-      }
+      out.writeShort(Version.CURRENT_ORDINAL);
 
       objectSerializer.writeObject(request, out);
       out.flush();
 
       if (replyExpected) {
         DataInputStream in = new DataInputStream(sock.getInputStream());
-        in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion));
+        in = new VersionedDataInputStream(in, null);
         try {
           Object response = objectDeserializer.readObject(in);
           logger.debug("received response: {}", response);
@@ -205,13 +174,6 @@ public class TcpClient {
       } else {
         return null;
       }
-    } catch (UnsupportedSerializationVersionException ex) {
-      if (logger.isDebugEnabled()) {
-        logger
-            .debug("Remote TcpServer version: " + serverVersion + " is higher than local version: "
-                + Version.CURRENT_ORDINAL + ". This is never expected as remoteVersion");
-      }
-      return null;
     } finally {
       try {
         if (replyExpected) {
@@ -232,93 +194,4 @@ public class TcpClient {
       }
     }
   }
-
-  private Short getServerVersion(HostAndPort addr, int timeout)
-      throws IOException, ClassNotFoundException {
-
-    int gossipVersion;
-    Short serverVersion;
-    Socket sock;
-
-    // Get GemFire version of TcpServer first, before sending any other request.
-    synchronized (serverVersions) {
-      serverVersion = serverVersions.get(addr);
-    }
-
-    if (serverVersion != null) {
-      return serverVersion;
-    }
-
-    gossipVersion = TcpServer.getOldGossipVersion();
-
-    try {
-      sock = socketCreator.forCluster().connect(addr, timeout, null);
-      sock.setSoTimeout(timeout);
-    } catch (SSLException e) {
-      throw new IllegalStateException("Unable to form SSL connection", e);
-    }
-
-    try {
-      OutputStream outputStream = new BufferedOutputStream(sock.getOutputStream());
-      DataOutputStream out =
-          new VersionedDataOutputStream(new DataOutputStream(outputStream), Version.GFE_57);
-
-      out.writeInt(gossipVersion);
-
-      VersionRequest verRequest = new VersionRequest();
-      objectSerializer.writeObject(verRequest, out);
-      out.flush();
-
-      InputStream inputStream = sock.getInputStream();
-      DataInputStream in = new DataInputStream(inputStream);
-      in = new VersionedDataInputStream(in, Version.GFE_57);
-      try {
-        Object readObject = objectDeserializer.readObject(in);
-        if (!(readObject instanceof VersionResponse)) {
-          throw new IllegalThreadStateException(
-              "Server version response invalid: "
-                  + "This could be the result of trying to connect a non-SSL-enabled client to an SSL-enabled locator.");
-        }
-
-        VersionResponse response = (VersionResponse) readObject;
-        serverVersion = response.getVersionOrdinal();
-        synchronized (serverVersions) {
-          serverVersions.put(addr, serverVersion);
-        }
-
-        return serverVersion;
-
-      } catch (EOFException ex) {
-        // old locators will not recognize the version request and will close the connection
-      }
-    } finally {
-      try {
-        sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
-      } catch (Exception e) {
-        logger.error("Error aborting socket ", e);
-      }
-      try {
-        sock.close();
-      } catch (Exception e) {
-        logger.error("Error closing socket ", e);
-      }
-
-    }
-    synchronized (serverVersions) {
-      serverVersions.put(addr, Version.GFE_57.ordinal());
-    }
-    return Short.valueOf(Version.GFE_57.ordinal());
-  }
-
-
-  /**
-   * Clear static class information concerning Locators. This is used in unit tests. It will force
-   * TcpClient to send version-request messages to locators to reestablish knowledge of their
-   * communication protocols.
-   */
-  public static void clearStaticData() {
-    synchronized (serverVersions) {
-      serverVersions.clear();
-    }
-  }
 }
diff --git a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 3b5c159..b811309 100755
--- a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -26,8 +26,6 @@ import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
@@ -37,7 +35,6 @@ import javax.net.ssl.SSLException;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.annotations.internal.MutableForTesting;
 import org.apache.geode.internal.serialization.ObjectDeserializer;
 import org.apache.geode.internal.serialization.ObjectSerializer;
 import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException;
@@ -65,15 +62,6 @@ public class TcpServer {
    */
   public static final int GOSSIPVERSION = 1002;
 
-  /**
-   * Version 1001 was the on-wire protocol version prior to the introduction of the use of
-   * Geode's Version class to designate the on-wire protocol.
-   */
-  public static final int OLDGOSSIPVERSION = 1001;
-
-  @MutableForTesting("The map used here is mutable, because some tests modify it")
-  private static final Map<Integer, Short> GOSSIP_TO_GEMFIRE_VERSION_MAP =
-      createGossipToVersionMap();
   public static final int GOSSIP_BYTE = 0;
   private static final String P2P_BACKLOG_PROPERTY_NAME = "p2p.backlog";
 
@@ -105,17 +93,6 @@ public class TcpServer {
   private final LongSupplier nanoTimeSupplier;
 
 
-  /*
-   * Old on-wire protocol map. This should be removed in a release that breaks all backward
-   * compatibility since it has been replaced with Geode's Version class.
-   */
-  private static Map<Integer, Short> createGossipToVersionMap() {
-    HashMap<Integer, Short> map = new HashMap<>();
-    map.put(GOSSIPVERSION, Version.GFE_71.ordinal());
-    map.put(OLDGOSSIPVERSION, Version.GFE_57.ordinal());
-    return map;
-  }
-
   /**
    * The constructor for TcpServer
    *
@@ -273,7 +250,7 @@ public class TcpServer {
   }
 
   protected void run() {
-    Socket sock = null;
+    Socket sock;
 
     while (!shuttingDown) {
       if (srv_sock.isClosed()) {
@@ -420,56 +397,43 @@ public class TcpServer {
 
     Object request;
     Object response;
-    short versionOrdinal;
-    if (gossipVersion <= getCurrentGossipVersion()
-        && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
-      // Create a versioned stream to remember sender's GemFire version
-      versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
-
-      if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
-        // Recent versions of TcpClient will send the version ordinal
-        versionOrdinal = input.readShort();
-      }
+    short versionOrdinal = input.readShort();
 
-      if (logger.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
-        logger.debug("Locator reading request from " + socket.getInetAddress() + " with version "
-            + Version.fromOrdinal(versionOrdinal));
-      }
-      input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal));
-      request = objectDeserializer.readObject(input);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Locator received request " + request + " from " + socket.getInetAddress());
-      }
-      if (request instanceof ShutdownRequest) {
-        shuttingDown = true;
-        // Don't call shutdown from within the worker thread, see java bug #6576792.
-        // Closing the socket will cause our acceptor thread to shutdown the executor
-        srv_sock.close();
-        response = new ShutdownResponse();
-      } else if (request instanceof VersionRequest) {
-        response = handleVersionRequest(request);
-      } else {
-        response = handler.processRequest(request);
-      }
+    if (logger.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
+      logger.debug("Locator reading request from " + socket.getInetAddress() + " with version "
+          + Version.fromOrdinal(versionOrdinal));
+    }
+    input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal));
+    request = objectDeserializer.readObject(input);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Locator received request " + request + " from " + socket.getInetAddress());
+    }
+    if (request instanceof ShutdownRequest) {
+      shuttingDown = true;
+      // Don't call shutdown from within the worker thread, see java bug #6576792.
+      // Closing the socket will cause our acceptor thread to shutdown the executor
+      srv_sock.close();
+      response = new ShutdownResponse();
+    } else if (request instanceof VersionRequest) {
+      response = handleVersionRequest(request);
+    } else {
+      response = handler.processRequest(request);
+    }
 
-      handler.endRequest(request, startTime);
+    handler.endRequest(request, startTime);
 
-      final long startTime2 = nanoTimeSupplier.getAsLong();
-      if (response != null) {
-        DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-        if (versionOrdinal != Version.CURRENT_ORDINAL) {
-          output =
-              new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal));
-        }
-        objectSerializer.writeObject(response, output);
-        output.flush();
+    final long startTime2 = nanoTimeSupplier.getAsLong();
+    if (response != null) {
+      DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+      if (versionOrdinal != Version.CURRENT_ORDINAL) {
+        output =
+            new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal));
       }
-
-      handler.endResponse(request, startTime2);
-    } else {
-      // Close the socket. We can not accept requests from a newer version
-      rejectUnknownProtocolConnection(socket, gossipVersion);
+      objectSerializer.writeObject(response, output);
+      output.flush();
     }
+
+    handler.endResponse(request, startTime2);
   }
 
   private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) {
@@ -495,9 +459,4 @@ public class TcpServer {
   public static int getCurrentGossipVersion() {
     return GOSSIPVERSION;
   }
-
-  public static int getOldGossipVersion() {
-    return OLDGOSSIPVERSION;
-  }
-
 }
diff --git a/geode-tcp-server/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java b/geode-tcp-server/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
index 6e43f47..c79d5c8 100644
--- a/geode-tcp-server/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
+++ b/geode-tcp-server/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
@@ -37,7 +37,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
@@ -81,11 +80,6 @@ public class TcpServerJUnitTest {
     assertThat(server.isAlive()).isTrue();
   }
 
-  @Before
-  public void setup() {
-    TcpClient.clearStaticData();
-  }
-
   @Test
   public void testConnectToUnknownHost() throws Exception {
     final TcpClient tcpClient = createTcpClient();