You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/11/07 01:33:30 UTC

[geode] branch develop updated: GEODE-3637: Revert changes to client queue initialization

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f53bdf0  GEODE-3637: Revert changes to client queue initialization
f53bdf0 is described below

commit f53bdf0306c9fb522d6aa64a855a0415b158b8d9
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Mon Nov 6 16:26:42 2017 -0800

    GEODE-3637: Revert changes to client queue initialization
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 312 +++++++++------------
 .../cache/tier/sockets/ServerConnection.java       |  51 ++--
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 263 -----------------
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 +-
 4 files changed, 157 insertions(+), 491 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 21a0ac3..5b289a9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -103,7 +103,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
-  private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
 
   protected final CacheServerStats stats;
   private final int maxConnections;
@@ -116,11 +115,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private final ThreadPoolExecutor hsPool;
 
   /**
-   * A pool used to process client-queue-initializations.
-   */
-  private final ThreadPoolExecutor clientQueueInitPool;
-
-  /**
    * The port on which this acceptor listens for client connections
    */
   private final int localPort;
@@ -540,126 +534,103 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    pool = initializeServerConnectionThreadPool();
-    hsPool = initializeHandshakerThreadPool();
-    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
-
-    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
-    String postAuthzFactoryName =
-        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
-    isPostAuthzCallbackPresent =
-        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
-  }
-
-  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
-    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-    ThreadFactory socketThreadFactory = new ThreadFactory() {
-      AtomicInteger connNum = new AtomicInteger(-1);
-
-      @Override
-      public Thread newThread(Runnable command) {
-        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
-        getStats().incAcceptThreadsCreated();
-        return new Thread(socketThreadGroup, command, threadName);
-      }
-    };
-    try {
-      final BlockingQueue blockingQueue = new SynchronousQueue();
-      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
-        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
-          try {
-            blockingQueue.put(r);
-          } catch (InterruptedException ex) {
-            Thread.currentThread().interrupt(); // preserve the state
-            throw new RejectedExecutionException(
-                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+    {
+      ThreadPoolExecutor tmp_pool = null;
+      String gName = "ServerConnection "
+          // + serverSock.getInetAddress()
+          + "on port " + this.localPort;
+      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+      ThreadFactory socketThreadFactory = new ThreadFactory() {
+        int connNum = -1;
+
+        public Thread newThread(final Runnable command) {
+          int tnum;
+          synchronized (this) {
+            tnum = ++connNum;
           }
+          String tName = socketThreadGroup.getName() + " Thread " + tnum;
+          getStats().incConnectionThreadsCreated();
+          Runnable r = new Runnable() {
+            public void run() {
+              try {
+                command.run();
+              } catch (CancelException e) { // bug 39463
+                // ignore
+              } finally {
+                ConnectionTable.releaseThreadsSockets();
+              }
+            }
+          };
+          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
-      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
-          socketThreadFactory, rejectedExecutionHandler);
-    } catch (IllegalArgumentException poolInitException) {
-      this.stats.close();
-      this.serverSock.close();
-      this.pool.shutdownNow();
-      throw poolInitException;
+      try {
+        if (isSelector()) {
+          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+        } else {
+          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+        }
+      } catch (IllegalArgumentException poolInitException) {
+        this.stats.close();
+        this.serverSock.close();
+        throw poolInitException;
+      }
+      this.pool = tmp_pool;
     }
-  }
-
-  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
-    final ThreadGroup clientQueueThreadGroup =
-        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
+    {
+      ThreadPoolExecutor tmp_hsPool = null;
+      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
 
-    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
-      AtomicInteger connNum = new AtomicInteger(-1);
+      ThreadFactory socketThreadFactory = new ThreadFactory() {
+        int connNum = -1;
 
-      @Override
-      public Thread newThread(final Runnable command) {
-        String threadName =
-            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
-        Runnable runnable = new Runnable() {
-          public void run() {
-            try {
-              command.run();
-            } catch (CancelException e) {
-              logger.debug("Client Queue Initialization was canceled.", e);
-            }
+        public Thread newThread(Runnable command) {
+          int tnum;
+          synchronized (this) {
+            tnum = ++connNum;
           }
-        };
-        return new Thread(clientQueueThreadGroup, runnable, threadName);
-      }
-    };
-    return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(),
-        clientQueueThreadFactory, 60000);
-  }
-
-  private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
-    String gName = "ServerConnection "
-        // + serverSock.getInetAddress()
-        + "on port " + this.localPort;
-    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-    ThreadFactory socketThreadFactory = new ThreadFactory() {
-      AtomicInteger connNum = new AtomicInteger(-1);
-
-      @Override
-      public Thread newThread(final Runnable command) {
-        String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
-        getStats().incConnectionThreadsCreated();
-        Runnable r = new Runnable() {
-          public void run() {
+          String tName = socketThreadGroup.getName() + " Thread " + tnum;
+          getStats().incAcceptThreadsCreated();
+          return new Thread(socketThreadGroup, command, tName);
+        }
+      };
+      try {
+        final BlockingQueue bq = new SynchronousQueue();
+        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
             try {
-              command.run();
-            } catch (CancelException e) { // bug 39463
-              // ignore
-            } finally {
-              ConnectionTable.releaseThreadsSockets();
+              bq.put(r);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt(); // preserve the state
+              throw new RejectedExecutionException(
+                  LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
             }
           }
         };
-        return new Thread(socketThreadGroup, r, tName);
-      }
-    };
-    try {
-      if (isSelector()) {
-        return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-            getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-      } else {
-        return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
-            TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
+            socketThreadFactory, reh);
+      } catch (IllegalArgumentException poolInitException) {
+        this.stats.close();
+        this.serverSock.close();
+        this.pool.shutdownNow();
+        throw poolInitException;
       }
-    } catch (IllegalArgumentException poolInitException) {
-      this.stats.close();
-      this.serverSock.close();
-      throw poolInitException;
+      this.hsPool = tmp_hsPool;
     }
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
   }
 
   public long getAcceptorId() {
@@ -695,8 +666,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   @Deprecated
   private static final int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private static final int HANDSHAKE_POOL_SIZE = Integer
-      .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
+  private static final int HANDSHAKE_POOL_SIZE =
+      Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
 
   @Override
   public void start() throws IOException {
@@ -831,9 +802,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
-    if (emergencyClassesLoaded) {
+    if (emergencyClassesLoaded)
       return;
-    }
     emergencyClassesLoaded = true;
     CachedRegionHelper.loadEmergencyClasses();
     ServerConnection.loadEmergencyClasses();
@@ -900,9 +870,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private Selector tmpSel;
 
   private void checkForStuckKeys() {
-    if (!WORKAROUND_SELECTOR_BUG) {
+    if (!WORKAROUND_SELECTOR_BUG)
       return;
-    }
     if (tmpSel == null) {
       try {
         tmpSel = Selector.open();
@@ -918,9 +887,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     while (it.hasNext()) {
       SelectionKey sk = (SelectionKey) it.next();
       ServerConnection sc = (ServerConnection) sk.attachment();
-      if (sc == null) {
+      if (sc == null)
         continue;
-      }
       try {
         sk.cancel();
         this.selector.selectNow(); // clear the cancelled key
@@ -1072,7 +1040,40 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           break;
         }
         if (events == 0) {
+          // zeroEventsCount++;
+          // if (zeroEventsCount > 0) {
+          // zeroEventsCount = 0;
           checkForStuckKeys();
+
+          // try {
+          // this.selector.close(); // this selector is sick!
+          // } catch (IOException ignore) {
+          // }
+          // this.selector = Selector.open();
+          // {
+          // Iterator it = selectorRegistrations.iterator();
+          // while (it.hasNext()) {
+          // ServerConnection sc = (ServerConnection)it.next();
+          // sc.registerWithSelector2(this.selector);
+          // }
+          // }
+          // }
+          // ArrayList al = new ArrayList();
+          // Iterator keysIt = this.selector.keys().iterator();
+          // while (keysIt.hasNext()) {
+          // SelectionKey sk = (SelectionKey)keysIt.next();
+          // al.add(sk.attachment());
+          // sk.cancel();
+          // }
+          // events = this.selector.selectNow();
+          // Iterator alIt = al.iterator();
+          // while (alIt.hasNext()) {
+          // ServerConnection sc = (ServerConnection)alIt.next();
+          // sc.registerWithSelector2(this.selector);
+          // }
+          // events = this.selector.select();
+          // } else {
+          // zeroEventsCount = 0;
         }
         while (events > 0) {
           int cancelCount = 0;
@@ -1129,11 +1130,16 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
                   logger.warn(
                       LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
                 }
+                // } else if (key.isValid() && key.isConnectable()) {
+                // logger.info("DEBUG isConnectable and isValid key=" + key);
+                // finishCon(sc);
               } else {
                 finishCon(sc);
                 if (key.isValid()) {
                   logger.warn(LocalizedMessage.create(
                       LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
+                  // } else {
+                  // logger.info("DEBUG !isValid key=" + key);
                 }
               }
             } catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1399,10 +1405,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     return this.clientServerCnxCount.get();
   }
 
-  public boolean isNotifyBySubscription() {
-    return notifyBySubscription;
-  }
-
   protected void handleNewClientConnection(final Socket socket,
       final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
@@ -1425,9 +1427,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
-    // initialization to be done in another threadPool
-    if (initializeClientPools(socket, communicationMode)) {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          primary ? "primary" : "secondary", socket);
+      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+          this.notifyBySubscription);
       return;
     }
 
@@ -1493,17 +1498,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
   }
 
-  private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean isPrimaryServerToClient =
-          communicationMode == CommunicationMode.PrimaryServerToClient;
-      clientQueueInitPool
-          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
-      return true;
-    }
-    return false;
-  }
-
   private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
     socket.setSoTimeout(this.acceptTimeout);
     this.socketCreator.configureServerSSLSocket(socket);
@@ -1668,6 +1662,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
    *         will be listened to.
+   *
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1796,6 +1791,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * This method returns a thread safe structure which can be iterated over without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
    * client info.
+   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
@@ -1819,42 +1815,4 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
-
-  private class ClientQueueInitializerTask implements Runnable {
-    private final Socket socket;
-    private final boolean isPrimaryServerToClient;
-    private final AcceptorImpl acceptor;
-
-    public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
-        AcceptorImpl acceptor) {
-      this.socket = socket;
-      this.acceptor = acceptor;
-      this.isPrimaryServerToClient = isPrimaryServerToClient;
-    }
-
-    @Override
-    public void run() {
-      logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          isPrimaryServerToClient ? "primary" : "secondary", socket);
-      try {
-        acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
-            acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
-      } catch (IOException ex) {
-        closeSocket(socket);
-        if (isRunning()) {
-          if (!acceptor.loggedAcceptError) {
-            acceptor.loggedAcceptError = true;
-            if (ex instanceof SocketTimeoutException) {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
-            } else {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
-                  ex), ex);
-            }
-          }
-        }
-      }
-    }
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 74451e5..0e510af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -189,9 +189,7 @@ public abstract class ServerConnection implements Runnable {
    */
   private volatile int requestSpecificTimeout = -1;
 
-  /**
-   * Tracks the id of the most recent batch to which a reply has been sent
-   */
+  /** Tracks the id of the most recent batch to which a reply has been sent */
   private int latestBatchIdReplied = -1;
 
   /*
@@ -713,9 +711,8 @@ public abstract class ServerConnection implements Runnable {
       // can be used.
       initializeCommands();
       // its initialized in verifyClientConnection call
-      if (!getCommunicationMode().isWAN()) {
+      if (!getCommunicationMode().isWAN())
         initializeClientUserAuths();
-      }
     }
     if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
       Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -895,9 +892,7 @@ public abstract class ServerConnection implements Runnable {
         }
       }
       if (unregisterClient)// last serverconnection call all close on auth objects
-      {
         cleanClientAuths();
-      }
       this.clientUserAuths = null;
       if (needsUnregister) {
         this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -922,9 +917,8 @@ public abstract class ServerConnection implements Runnable {
     ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
     ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
 
-    if (retCua == null) {
+    if (retCua == null)
       return cua;
-    }
     return retCua;
   }
 
@@ -960,9 +954,8 @@ public abstract class ServerConnection implements Runnable {
         boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
 
         // if not successfull, try the old way
-        if (!removed) {
+        if (!removed)
           removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
-        }
         return removed;
 
       } catch (NullPointerException npe) {
@@ -1017,6 +1010,7 @@ public abstract class ServerConnection implements Runnable {
         throw new AuthenticationFailedException("Authentication failed");
       }
 
+
       byte[] credBytes = msg.getPart(0).getSerializedForm();
 
       credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1130,7 +1124,6 @@ public abstract class ServerConnection implements Runnable {
 
   public void run() {
     setOwner();
-
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
@@ -1143,7 +1136,9 @@ public abstract class ServerConnection implements Runnable {
             finishedMsg = true;
           }
         }
-      } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
+      } catch (java.nio.channels.ClosedChannelException ignore) {
+        // ok shutting down
+      } catch (CancelException e) {
         // ok shutting down
       } catch (IOException ex) {
         logger.warn(
@@ -1192,7 +1187,6 @@ public abstract class ServerConnection implements Runnable {
    * If registered with a selector then this will be the key we are registered with.
    */
   // private SelectionKey sKey = null;
-
   /**
    * Register this connection with the given selector for read events. Note that switch the channel
    * to non-blocking so it can be in a selector.
@@ -1208,8 +1202,7 @@ public abstract class ServerConnection implements Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */
-    getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+    /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
   /**
@@ -1232,6 +1225,7 @@ public abstract class ServerConnection implements Runnable {
   }
 
   /**
+   *
    * @return String representing the DistributedSystemMembership of the Client VM
    */
   public String getMembershipID() {
@@ -1271,11 +1265,10 @@ public abstract class ServerConnection implements Runnable {
   }
 
   protected int getClientReadTimeout() {
-    if (this.requestSpecificTimeout == -1) {
+    if (this.requestSpecificTimeout == -1)
       return this.handshake.getClientReadTimeout();
-    } else {
+    else
       return this.requestSpecificTimeout;
-    }
   }
 
   protected boolean isProcessingMessage() {
@@ -1526,9 +1519,7 @@ public abstract class ServerConnection implements Runnable {
     return this.name;
   }
 
-  /**
-   * returns the name of this connection
-   */
+  /** returns the name of this connection */
   public String getName() {
     return this.name;
   }
@@ -1745,13 +1736,11 @@ public abstract class ServerConnection implements Runnable {
     // for backward client it will be store in member variable userAuthId
     // for other look "requestMsg" here and get unique-id from this to get the authzrequest
 
-    if (!AcceptorImpl.isAuthenticationRequired()) {
+    if (!AcceptorImpl.isAuthenticationRequired())
       return null;
-    }
 
-    if (AcceptorImpl.isIntegratedSecurity()) {
+    if (AcceptorImpl.isIntegratedSecurity())
       return null;
-    }
 
     long uniqueId = getUniqueId();
 
@@ -1779,13 +1768,11 @@ public abstract class ServerConnection implements Runnable {
 
   public AuthorizeRequestPP getPostAuthzRequest()
       throws AuthenticationRequiredException, IOException {
-    if (!AcceptorImpl.isAuthenticationRequired()) {
+    if (!AcceptorImpl.isAuthenticationRequired())
       return null;
-    }
 
-    if (AcceptorImpl.isIntegratedSecurity()) {
+    if (AcceptorImpl.isIntegratedSecurity())
       return null;
-    }
 
     // look client version and return authzrequest
     // for backward client it will be store in member variable userAuthId
@@ -1812,9 +1799,7 @@ public abstract class ServerConnection implements Runnable {
     return postAuthReq;
   }
 
-  /**
-   * returns the member ID byte array to be used for creating EventID objects
-   */
+  /** returns the member ID byte array to be used for creating EventID objects */
   public byte[] getEventMemberIDByteArray() {
     return this.memberIdByteArray;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
deleted file mode 100644
index c0b2d07..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.rmi.RemoteException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.InterestPolicy;
-import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.SubscriptionAttributes;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.server.ClientSubscriptionConfig;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedLockBlackboard;
-import org.apache.geode.distributed.DistributedLockBlackboardImpl;
-import org.apache.geode.internal.cache.DiskStoreAttributes;
-import org.apache.geode.internal.cache.InitialImageOperation;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
-import org.apache.geode.test.dunit.rules.SharedCountersRule;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
-import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
-
-@Category(DistributedTest.class)
-public class AcceptorImplClientQueueDUnitTest implements Serializable {
-  private final Host host = Host.getHost(0);
-  private static final int numberOfEntries = 200;
-  private static final AtomicInteger eventCount = new AtomicInteger(0);
-  private static final AtomicBoolean completedClient2 = new AtomicBoolean(false);
-
-  @ClassRule
-  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
-
-  @Rule
-  public CacheRule cacheRule =
-      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
-          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
-
-  @Rule
-  public SerializableTestName name = new SerializableTestName();
-
-  @Rule
-  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
-
-  @Rule
-  public DistributedRestoreSystemProperties restoreSystemProperties =
-      new DistributedRestoreSystemProperties();
-
-  private DistributedLockBlackboard blackboard = null;
-
-  @Before
-  public void setup() throws Exception {
-    blackboard = DistributedLockBlackboardImpl.getInstance();
-  }
-
-  @After
-  public void tearDown() throws RemoteException {
-    blackboard.initCount();
-    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
-      InitialImageOperation.slowImageProcessing = 0;
-      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
-    }));
-  }
-
-  @Test
-  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    VM vm3 = host.getVM(3);
-    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
-      try {
-        return createSubscriptionServer(cacheRule.getCache());
-      } catch (IOException e) {
-        return 0;
-      }
-    });
-
-    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
-      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-      clientCacheFactory.setPoolSubscriptionEnabled(true);
-      clientCacheFactory.setPoolSubscriptionRedundancy(1);
-      clientCacheFactory.setPoolReadTimeout(200);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
-          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
-      ClientRegionFactory<Object, Object> clientRegionFactory =
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-      Region region = clientRegionFactory.create("subscriptionRegion");
-
-      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
-      cache.readyForEvents();
-      cache.close(true);
-    });
-    vm3.invoke("Start Client2 to add entries to region", () -> {
-      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
-      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
-      ClientRegionFactory<Object, Object> clientRegionFactory =
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-      Region region = clientRegionFactory.create("subscriptionRegion");
-
-      for (int i = 0; i < numberOfEntries; i++) {
-        region.put(i, i);
-      }
-      cache.close();
-    });
-
-    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
-      try {
-        int serverPort = createSubscriptionServer(cacheRule.getCache());
-        InitialImageOperation.slowImageProcessing = 30;
-        return serverPort;
-      } catch (IOException e) {
-        return 0;
-      }
-    });
-
-    vm0.invoke("Turn on slow image processsing", () -> {
-      InitialImageOperation.slowImageProcessing = 30;
-    });
-
-    AsyncInvocation<Boolean> completedClient1 =
-        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
-
-          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-          clientCacheFactory.setPoolSubscriptionEnabled(true);
-          clientCacheFactory.setPoolSubscriptionRedundancy(1);
-          clientCacheFactory.setPoolMinConnections(1);
-          clientCacheFactory.setPoolMaxConnections(1);
-          clientCacheFactory.setPoolReadTimeout(200);
-          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
-              .set("durable-client-timeout", "300").set("mcast-port", "0");
-          blackboard.incCount();
-          ClientCache cache = cacheFactory.create();
-
-          ClientRegionFactory<Object, Object> clientRegionFactory =
-              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
-            @Override
-            public void afterCreate(EntryEvent event) {
-              eventCount.incrementAndGet();
-            }
-
-            @Override
-            public void afterUpdate(EntryEvent event) {
-              eventCount.incrementAndGet();
-            }
-          }).create("subscriptionRegion");
-
-          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
-          cache.readyForEvents();
-          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
-              .until(() -> eventCount.get() == numberOfEntries);
-          cache.close();
-          return eventCount.get() == numberOfEntries;
-        });
-
-    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
-      while (true) {
-        Thread.sleep(100);
-        if (blackboard.getCount() == 1) {
-          break;
-        }
-      }
-      ClientCache cache = null;
-      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-      clientCacheFactory.setPoolRetryAttempts(0);
-      clientCacheFactory.setPoolMinConnections(1);
-      clientCacheFactory.setPoolMaxConnections(1);
-      clientCacheFactory.setPoolReadTimeout(200);
-      clientCacheFactory.setPoolSocketConnectTimeout(500);
-      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-      cache = clientCacheFactory.set("mcast-port", "0").create();
-      ClientRegionFactory<Object, Object> clientRegionFactory =
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
-      Region region = clientRegionFactory.create("subscriptionRegion");
-
-      int returnValue = 0;
-      for (int i = 0; i < 100; i++) {
-        returnValue = (int) region.get(i);
-      }
-      cache.close();
-      completedClient2.set(returnValue == 99);
-    });
-    assertTrue(completedClient1.get());
-    assertTrue(vm3.invoke(() -> completedClient2.get()));
-  }
-
-  private int createSubscriptionServer(InternalCache cache) throws IOException {
-    initializeDiskStore(cache);
-    initializeReplicateRegion(cache);
-    return initializeCacheServerWithSubscription(host, cache);
-  }
-
-  private void initializeDiskStore(InternalCache cache) throws IOException {
-    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
-    diskStoreAttributes.name = "clientQueueDS";
-    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
-    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
-  }
-
-  private void initializeReplicateRegion(InternalCache cache) {
-    cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
-        .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
-        .create("subscriptionRegion");
-  }
-
-  private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
-      throws IOException {
-    CacheServer cacheServer1 = cache.addCacheServer(false);
-    ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
-    clientSubscriptionConfig.setEvictionPolicy("entry");
-    clientSubscriptionConfig.setCapacity(5);
-    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
-    cacheServer1.setPort(0);
-    cacheServer1.setHostnameForClients(host.getHostName());
-    cacheServer1.start();
-    return cacheServer1.getPort();
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index b65bf86..dc42da8 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,7 +63,6 @@ public class CacheRule extends DistributedExternalResource {
   private final boolean disconnectAfter;
   private final List<VM> createCacheInVMs;
   private final Properties config;
-  private final Properties systemProperties;
 
   public static Builder builder() {
     return new Builder();
@@ -75,19 +74,18 @@ public class CacheRule extends DistributedExternalResource {
     this.disconnectAfter = builder.disconnectAfter;
     this.createCacheInVMs = builder.createCacheInVMs;
     this.config = builder.config;
-    this.systemProperties = builder.systemProperties;
   }
 
   @Override
   protected void before() {
     if (createCacheInAll) {
-      invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
+      invoker().invokeInEveryVMAndController(() -> createCache(config));
     } else {
       if (createCache) {
-        createCache(config, systemProperties);
+        createCache(config);
       }
       for (VM vm : createCacheInVMs) {
-        vm.invoke(() -> createCache(config, systemProperties));
+        vm.invoke(() -> createCache(config));
       }
     }
   }
@@ -110,8 +108,7 @@ public class CacheRule extends DistributedExternalResource {
     return cache.getInternalDistributedSystem();
   }
 
-  private static void createCache(final Properties config, final Properties systemProperties) {
-    System.getProperties().putAll(systemProperties);
+  private static void createCache(final Properties config) {
     cache = (InternalCache) new CacheFactory(config).create();
   }
 
@@ -144,7 +141,6 @@ public class CacheRule extends DistributedExternalResource {
     private boolean disconnectAfter;
     private List<VM> createCacheInVMs = new ArrayList<>();
     private Properties config = new Properties();
-    private Properties systemProperties = new Properties();
 
     public Builder() {
       config.setProperty(LOCATORS, getLocators());
@@ -199,16 +195,6 @@ public class CacheRule extends DistributedExternalResource {
       return this;
     }
 
-    public Builder addSystemProperty(final String key, final String value) {
-      this.systemProperties.put(key, value);
-      return this;
-    }
-
-    public Builder addSystemProperties(final Properties config) {
-      this.systemProperties.putAll(config);
-      return this;
-    }
-
     public CacheRule build() {
       return new CacheRule(this);
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].