You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/05 17:25:49 UTC

[1/4] geode git commit: Create ClientCachePutBench

Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632 [created] 135d674ac


Create ClientCachePutBench


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1881851d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1881851d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1881851d

Branch: refs/heads/feature/GEODE-2632
Commit: 1881851d999cec72d66be98745ff172eec326627
Parents: 6ce55a0
Author: Kirk Lund <kl...@apache.org>
Authored: Mon Apr 3 14:52:28 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Mon Apr 3 14:52:57 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   5 +-
 geode-core/build.gradle                         |   5 +
 .../sockets/command/ClientCachePutBench.java    | 174 +++++++++++++++++++
 .../cache/tier/sockets/command/Put65Bench.java  | 120 +++++++++++++
 .../command/ClientCachePutBench-server.xml      |  29 ++++
 .../internal/ClusterConfigurationService.java   |  20 ++-
 .../cache/tier/sockets/BaseCommand.java         |  35 ++--
 .../tier/sockets/ClientProxyMembershipID.java   |   5 +-
 .../geode/distributed/ServerLauncherUtils.java  |  30 ++++
 .../cache/tier/sockets/CacheServerUtils.java    |  55 ++++++
 .../command/ExperimentIntegrationTest.java      |  80 +++++++++
 .../tier/sockets/command/ExperimentTest.java    | 121 +++++++++++++
 geode-web/src/main/webapp/WEB-INF/web.xml       |   2 +-
 gradle/rat.gradle                               |   2 +
 14 files changed, 652 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 38c8131..31d5996 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,7 +12,8 @@ build/
 .idea/
 build-eclipse/
 /tags
-
+out/
+bin/
 
 
 *.iml
@@ -27,4 +28,4 @@ build-eclipse/
 *.orig
 geode-spark-connector/**/target/
 geode-spark-connector/project/project/
-geode-pulse/screenshots/
\ No newline at end of file
+geode-pulse/screenshots/

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/build.gradle
----------------------------------------------------------------------
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 757599a..fd56fe1 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -17,6 +17,7 @@
 
 
 apply plugin: 'antlr'
+apply plugin: 'me.champeau.gradle.jmh'
 
 sourceSets {
   jca {
@@ -220,5 +221,9 @@ dependencies {
   classesOutput sourceSets.main.output
 }
 
+jmh {
+  duplicateClassesStrategy = 'warn'
+}
+
 tasks.eclipse.dependsOn(generateGrammarSource)
 

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
new file mode 100644
index 0000000..a1cbd81
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.commons.io.FileUtils.*;
+import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.net.SocketCreator;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@Measurement(iterations = 5)
+@Warmup(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Thread)
+@SuppressWarnings("unused")
+public class ClientCachePutBench {
+
+  @Test
+  public void tempTest() throws Exception {
+    String SERVER_XML_FILE_NAME =
+        "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+            + "/ClientCachePutBench-server.xml";
+    assertThat(new File(getClass().getResource(SERVER_XML_FILE_NAME).getFile())).exists();
+  }
+
+  @State(Scope.Benchmark)
+  public static class ClientState {
+    // public static final String SERVER_XML_FILE_NAME =
+    // "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+    // + "/ClientCachePutBench-server.xml";
+    public static final String REGION_NAME = "clientCachePutBench-region";
+
+    public Random random;
+
+    public int serverPort;
+    public Process process;
+    public ServerLauncher launcher;
+    public File serverDirectory;
+
+    public ClientCache clientCache;
+    public Region<String, String> region;
+
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Setup(Level.Trial)
+    public void startServer() throws Exception {
+      this.random = new Random(System.nanoTime());
+
+      this.temporaryFolder.create();
+      this.serverDirectory = this.temporaryFolder.getRoot();
+
+      String SERVER_XML_FILE_NAME =
+          "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+              + "/ClientCachePutBench-server.xml";
+
+      URL srcServerXml = getClass().getResource(SERVER_XML_FILE_NAME);
+      assertThat(srcServerXml).isNotNull();
+      File destServerXml = new File(this.serverDirectory, SERVER_XML_FILE_NAME);
+      copyURLToFile(srcServerXml, destServerXml);
+
+      this.serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+      List<String> jvmArguments = getJvmArguments();
+
+      List<String> command = new ArrayList<>();
+      command.add(
+          new File(new File(System.getProperty("java.home"), "bin"), "java").getCanonicalPath());
+      for (String jvmArgument : jvmArguments) {
+        command.add(jvmArgument);
+      }
+      command.add("-Dgemfire.cache-xml-file=" + destServerXml.getAbsolutePath());
+      command.add("-cp");
+      command.add(System.getProperty("java.class.path"));
+      command.add(ServerLauncher.class.getName());
+      command.add(ServerLauncher.Command.START.getName());
+      command.add("server1");
+      command.add("--server-port=" + this.serverPort);
+      // command.add("--redirect-output");
+
+      this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start();
+
+      boolean forever = true;
+      while (forever) {
+        assertThat(this.process.isAlive()).isTrue();
+        Thread.sleep(10000);
+      }
+
+      this.clientCache =
+          new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create();
+      this.region =
+          this.clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+              .create(REGION_NAME);
+    }
+
+    @TearDown(Level.Trial)
+    public void stopServer() throws Exception {
+      try {
+        this.clientCache.close(false);
+        new ServerLauncher.Builder().setWorkingDirectory(this.serverDirectory.getAbsolutePath())
+            .build().stop();
+      } finally {
+        if (this.process != null) {
+          this.process.destroyForcibly();
+        }
+        this.temporaryFolder.delete();
+      }
+    }
+
+    private List<String> getJvmArguments() {
+      List<String> jvmArguments = new ArrayList<>();
+      jvmArguments.add(
+          "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.MCAST_PORT + "=0");
+      jvmArguments.add(
+          "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.LOCATORS + "\"\"");
+      return jvmArguments;
+    }
+  }
+
+  @Benchmark
+  public void test(ClientState state, Blackhole blackhole) throws Exception {
+    String key = "key-" + state.random.nextInt();
+    String value = "value-" + state.random.nextInt();
+    String oldValue = state.region.put(key, value);
+    blackhole.consume(new Object[] {key, value, oldValue});
+    blackhole.consume(oldValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
new file mode 100644
index 0000000..6ccd8c3
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.SystemFailure.loadEmergencyClasses;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+public class Put65Bench {
+
+  @State(Scope.Benchmark)
+  public static class ServerConnectionState {
+    public Command command;
+    public ServerConnection mockServerConnection;
+    public Message message;
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+      loadEmergencyClasses();
+
+      this.command = Put65.getCommand();
+
+      this.mockServerConnection = mock(ServerConnection.class);
+      when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+      TXManagerImpl txManager = mock(TXManagerImpl.class);
+      GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+      when(cache.getTxManager()).thenReturn(txManager);
+
+      when(this.mockServerConnection.getCache()).thenReturn(cache);
+
+      CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+      when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+
+      // .getDistributedMember()
+      ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+      when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+      Message errorResponseMessage = mock(Message.class);
+      when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+
+      Part regionNamePart = mock(Part.class);
+      when(regionNamePart.getString()).thenReturn("regionNamePart");
+
+      Part operationPart = mock(Part.class);
+      when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+
+      Part flagsPart = mock(Part.class);
+      when(flagsPart.getInt()).thenReturn(0);
+
+      Part keyPart = mock(Part.class);
+      when(keyPart.getObject()).thenReturn("keyPart");
+      when(keyPart.getStringOrObject()).thenReturn("keyPart");
+
+      Part isDeltaPart = mock(Part.class);
+      when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+      Part valuePart = mock(Part.class);
+      when(valuePart.getObject()).thenReturn("valuePart");
+
+      Part eventPart = mock(Part.class);
+      when(eventPart.getObject()).thenReturn("eventPart");
+
+      Part callbackArgPart = mock(Part.class);
+      when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+
+      message = mock(Message.class);
+
+      when(message.getTransactionId()).thenReturn(NOTX);
+
+      when(message.getPart(0)).thenReturn(regionNamePart);
+      when(message.getPart(1)).thenReturn(operationPart);
+      when(message.getPart(2)).thenReturn(flagsPart);
+      when(message.getPart(3)).thenReturn(keyPart);
+      when(message.getPart(4)).thenReturn(isDeltaPart);
+      when(message.getPart(5)).thenReturn(valuePart);
+      when(message.getPart(6)).thenReturn(eventPart);
+      when(message.getPart(7)).thenReturn(callbackArgPart);
+    }
+  }
+
+  // @Benchmark
+  public void benchmark(ServerConnectionState state, Blackhole blackhole) {
+    state.command.execute(state.message, state.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
new file mode 100644
index 0000000..2013b37
--- /dev/null
+++ b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one or more
+	contributor license agreements.  See the NOTICE file distributed with
+	this work for additional information regarding copyright ownership.
+	The ASF licenses this file to You under the Apache License, Version 2.0
+	(the "License"); you may not use this file except in compliance with
+	the License.  You may obtain a copy of the License at
+
+	     http://www.apache.org/licenses/LICENSE-2.0
+
+	Unless required by applicable law or agreed to in writing, software
+	distributed under the License is distributed on an "AS IS" BASIS,
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+	See the License for the specific language governing permissions and
+	limitations under the License.
+-->
+<cache
+        xmlns="http://geode.apache.org/schema/cache"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+        version="1.0">
+    <region name="clientCachePutBench-region" refid="REPLICATE">
+        <region-attributes>
+            <key-constraint>java.lang.String</key-constraint>
+            <value-constraint>java.lang.String</value-constraint>
+        </region-attributes>
+    </region>
+</cache>

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 95d1a5b..74df19c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -386,10 +386,22 @@ public class ClusterConfigurationService {
 
     createConfigDirIfNecessary(groupName);
 
-    byte[] jarBytes = locators.stream()
-        .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
-        .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
-            "No locators have a deployed jar named " + jarName + " in " + groupName));
+    byte[] jarBytes = null;
+    for (DistributedMember locator : locators) {
+      jarBytes = downloadJarFromLocator(locator, groupName, jarName);
+      if (jarBytes != null) {
+        break;
+      }
+    }
+    if (jarBytes == null) {
+      throw new IllegalStateException(
+          "No locators have a deployed jar named " + jarName + " in " + groupName);
+    }
+
+    // byte[] jarBytes = locators.stream()
+    // .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
+    // .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
+    // "No locators have a deployed jar named " + jarName + " in " + groupName));
 
     File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
     FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index d217672..ff9daca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -91,31 +91,26 @@ public abstract class BaseCommand implements Command {
   private static final int MAX_INCOMING_MSGS =
       Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
 
-  private static final Semaphore incomingDataLimiter;
-
-  private static final Semaphore incomingMsgLimiter;
-  static {
-    Semaphore tmp;
-    if (MAX_INCOMING_DATA > 0) {
-      // backport requires that this is fair since we inc by values > 1
-      tmp = new Semaphore(MAX_INCOMING_DATA, true);
-    } else {
-      tmp = null;
-    }
-    incomingDataLimiter = tmp;
-    if (MAX_INCOMING_MSGS > 0) {
-      tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
-      // performance
-    } else {
-      tmp = null;
-    }
-    incomingMsgLimiter = tmp;
+  // backport requires that this is fair since we inc by values > 1
+  private static final Semaphore incomingDataLimiter =
+      createIncomingLimiterSemaphore(MAX_INCOMING_DATA, true);
+
+  // unfair for best performance
+  private static final Semaphore incomingMsgLimiter =
+      createIncomingLimiterSemaphore(MAX_INCOMING_MSGS, false);
 
+  private static Semaphore createIncomingLimiterSemaphore(final int maximum, final boolean fair) {
+    Semaphore semaphore = null;
+    if (maximum > 0) {
+      semaphore = new Semaphore(maximum, fair);
+    }
+    return semaphore;
   }
 
   protected SecurityService securityService = IntegratedSecurityService.getSecurityService();
 
-  final public void execute(Message msg, ServerConnection servConn) {
+  @Override
+  public void execute(final Message msg, final ServerConnection servConn) {
     // Read the request and update the statistics
     long start = DistributionStats.getStatTime();
     // servConn.resetTransientData();

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..46e43c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -34,11 +34,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
 
 /**
  * This class represents a ConnectionProxy of the CacheClient
- * 
- * 
- * 
  */
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
     implements DataSerializableFixedID, Serializable, Externalizable {
 
   private static final Logger logger = LogService.getLogger();

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
new file mode 100644
index 0000000..017e0f5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed;
+
+import org.apache.geode.cache.Cache;
+
+/**
+ * Provides tests a way to access non-public state in ServerLauncher
+ */
+public class ServerLauncherUtils {
+
+  /**
+   * Returns the Cache from an online in-process ServerLauncher instance
+   */
+  public static Cache getCache(final ServerLauncher serverLauncher) {
+    return serverLauncher.getCache();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
new file mode 100644
index 0000000..8cd7622
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.CacheServerImpl;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides tests a way to access CacheServer, AcceptorImpl and ServerConnection
+ */
+public class CacheServerUtils {
+
+  /**
+   * Returns single CacheServer for the specified Cache instance
+   */
+  public static CacheServer getCacheServer(final Cache cache) {
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServer cacheServer = cacheServers.get(0);
+    return cacheServer;
+  }
+
+  /**
+   * Returns AcceptorImpl for the specified CacheServer instance
+   */
+  public static AcceptorImpl getAcceptorImpl(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    return acceptor;
+  }
+
+  /**
+   * Returns single ServerConnection for the specified CacheServer instance
+   */
+  public static ServerConnection getServerConnection(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
+    ServerConnection serverConnection = serverConnections.iterator().next(); // null
+    return serverConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
new file mode 100644
index 0000000..2d900dc
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+@Category(IntegrationTest.class)
+public class ExperimentIntegrationTest {
+
+  private ServerLauncher serverLauncher;
+  private ServerConnection serverConnection;
+
+  @Before
+  public void before() throws Exception {
+    int serverPort = getRandomAvailablePort(SOCKET);
+
+    this.serverLauncher =
+        new ServerLauncher.Builder().setMemberName("server").setServerPort(serverPort).build();
+    this.serverLauncher.start();
+
+    Cache cache = getCache(this.serverLauncher);
+    CacheServer cacheServer = getCacheServer(cache);
+    AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+    Socket mockSocket = mock(Socket.class);
+    when(mockSocket.getInetAddress()).thenReturn(SocketCreator.getLocalHost());
+
+    this.serverConnection =
+        new ServerConnection(mockSocket, cache, null, null, DEFAULT_HANDSHAKE_TIMEOUT_MS,
+            CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, "client", Acceptor.CLIENT_TO_SERVER, acceptor);
+
+    preConditions();
+  }
+
+  public void preConditions() throws Exception {
+    assertThat(this.serverLauncher.status().getStatus()).isEqualTo(ONLINE);
+  }
+
+  @Test
+  public void handlePutFromFakeClient() throws Exception {
+    Message message = mock(Message.class);
+    Command command = mock(Command.class);
+    command.execute(message, this.serverConnection);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
new file mode 100644
index 0000000..b52e81d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(UnitTest.class)
+public class ExperimentTest {
+
+  private ServerConnection mockServerConnection;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void before() throws Exception {
+    this.mockServerConnection = mock(ServerConnection.class);
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    TXManagerImpl txManager = mock(TXManagerImpl.class);
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    when(cache.getTxManager()).thenReturn(txManager);
+
+    when(this.mockServerConnection.getCache()).thenReturn(cache);
+
+    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+    when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+
+    // .getDistributedMember()
+    ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+    Message errorResponseMessage = mock(Message.class);
+    when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+  }
+
+  @Test
+  public void handlePutFromFakeClient() throws Exception {
+    Part regionNamePart = mock(Part.class);
+    when(regionNamePart.getString()).thenReturn("regionNamePart");
+
+    Part operationPart = mock(Part.class);
+    when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part flagsPart = mock(Part.class);
+    when(flagsPart.getInt()).thenReturn(0);
+
+    Part keyPart = mock(Part.class);
+    when(keyPart.getObject()).thenReturn("keyPart");
+    when(keyPart.getStringOrObject()).thenReturn("keyPart");
+
+    Part isDeltaPart = mock(Part.class);
+    when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part valuePart = mock(Part.class);
+    when(valuePart.getObject()).thenReturn("valuePart");
+
+    Part eventPart = mock(Part.class);
+    when(eventPart.getObject()).thenReturn("eventPart");
+
+    Part callbackArgPart = mock(Part.class);
+    when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+
+    Message message = mock(Message.class);
+
+    when(message.getTransactionId()).thenReturn(NOTX);
+
+    when(message.getPart(0)).thenReturn(regionNamePart);
+    when(message.getPart(1)).thenReturn(operationPart);
+    when(message.getPart(2)).thenReturn(flagsPart);
+    when(message.getPart(3)).thenReturn(keyPart);
+    when(message.getPart(4)).thenReturn(isDeltaPart);
+    when(message.getPart(5)).thenReturn(valuePart);
+    when(message.getPart(6)).thenReturn(eventPart);
+    when(message.getPart(7)).thenReturn(callbackArgPart);
+
+    assertThat(message.getPart(0)).isSameAs(regionNamePart);
+    assertThat(message.getPart(1)).isSameAs(operationPart);
+    assertThat(message.getPart(2)).isSameAs(flagsPart);
+    assertThat(message.getPart(3)).isSameAs(keyPart);
+    assertThat(message.getPart(4)).isSameAs(isDeltaPart);
+    assertThat(message.getPart(5)).isSameAs(valuePart);
+    assertThat(message.getPart(6)).isSameAs(eventPart);
+    assertThat(message.getPart(7)).isSameAs(callbackArgPart);
+
+    Command command = Put65.getCommand();
+    command.execute(message, this.mockServerConnection);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/geode-web/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/geode-web/src/main/webapp/WEB-INF/web.xml b/geode-web/src/main/webapp/WEB-INF/web.xml
index 873d675..ff24e80 100644
--- a/geode-web/src/main/webapp/WEB-INF/web.xml
+++ b/geode-web/src/main/webapp/WEB-INF/web.xml
@@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 -->
-<web-app xmlns="http://java.sun.com/xml/ns/j2ee"
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
          version="3.0">

http://git-wip-us.apache.org/repos/asf/geode/blob/1881851d/gradle/rat.gradle
----------------------------------------------------------------------
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index c97a9e9..7bea470 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -62,6 +62,8 @@ rat {
     '.idea/**',
     'geode-spark-connector/.idea/**',
     '**/tags',
+    '**/out/**',
+    '**/bin/**',
 
     // text files
     '**/*.fig',


[3/4] geode git commit: WIP refactoring

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7b923c..5370e2f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -43,7 +43,6 @@ import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.ClientSession;
 import org.apache.geode.cache.DynamicRegionFactory;
@@ -119,155 +118,128 @@ public class CacheClientProxy implements ClientSession {
   private static final Logger logger = LogService.getLogger();
 
   /**
-   * The socket between the server and the client
-   */
-  protected Socket _socket;
-
-  private final AtomicBoolean _socketClosed = new AtomicBoolean();
-
-  /**
-   * A communication buffer used by each message we send to the client
-   */
-  protected ByteBuffer _commBuffer;
-
-  /**
-   * The remote host's IP address string (cached for convenience)
-   */
-  protected String _remoteHostAddress;
-
-  /**
-   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+   * Notify the region when a client interest registration occurs. This tells the region to update
+   * access time when an update is to be pushed to a client. It is enabled only for
+   * <code>PartitionedRegion</code>s currently.
    */
-  protected volatile boolean isMarkedForRemoval = false;
+  private static final boolean NOTIFY_REGION_ON_INTEREST =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
 
   /**
-   * @see #isMarkedForRemoval
+   * The number of times to peek on shutdown before giving up and shutting down
    */
-  protected final Object isMarkedForRemovalLock = new Object();
+  private static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
+      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50);
 
   /**
-   * The proxy id of the client represented by this proxy
+   * Default value for slow starting time of dispatcher
    */
-  protected ClientProxyMembershipID proxyID;
+  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
 
   /**
-   * The GemFire cache
+   * Key in the system property from which the slow starting time value will be retrieved
    */
-  protected final GemFireCacheImpl _cache;
+  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
 
   /**
-   * The list of keys that the client represented by this proxy is interested in (stored by region)
+   * TODO: delete this and rewrite the tests that use this
+   * NOTE: this is NOT thread safe
    */
-  protected final ClientInterestList[] cils = new ClientInterestList[2];
+  private static TestHook testHook;
 
   /**
-   * A thread that dispatches messages to the client
+   * TODO: delete this and rewrite the test that uses this
+   * A debug flag used for testing Backward compatibility
    */
-  protected volatile MessageDispatcher _messageDispatcher;
+  private static boolean afterMessageCreationForTesting = false;
 
   /**
-   * The statistics for this proxy
+   * TODO: delete this and rewrite the test that uses this
+   * for testing purposes, delays the start of the dispatcher thread
    */
-  protected final CacheClientProxyStats _statistics;
+  private static boolean isSlowStartForTesting = false;
 
-  protected final AtomicReference _durableExpirationTask = new AtomicReference();
-
-  protected SystemTimer durableTimer;
+  private final AtomicBoolean socketClosed = new AtomicBoolean();
 
   /**
-   * Whether this dispatcher is paused
+   * @see #isMarkedForRemoval
    */
-  protected volatile boolean _isPaused = true;
+  private final Object isMarkedForRemovalLock = new Object();
 
   /**
-   * True if we are connected to a client.
-   */
-  private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
-  /**
-   * True if a marker message is still in the ha queue.
+   * The GemFire cache
    */
-  private boolean markerEnqueued = false;
+  private final GemFireCacheImpl cache;
 
   /**
-   * The number of times to peek on shutdown before giving up and shutting down
+   * The list of keys that the client represented by this proxy is interested in (stored by region)
    */
-  protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue();
+  private final ClientInterestList[] cils = new ClientInterestList[2];
 
   /**
-   * The number of milliseconds to wait for an offering to the message queue
+   * The statistics for this proxy
    */
-  protected static final int MESSAGE_OFFER_TIME = 0;
+  private final CacheClientProxyStats _statistics;
 
-  /**
-   * The default maximum message queue size
-   */
-  // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+  private final AtomicReference _durableExpirationTask = new AtomicReference();
 
   /** The message queue size */
-  protected final int _maximumMessageCount;
+  private final int _maximumMessageCount;
 
   /**
    * The time (in seconds ) after which a message in the client queue will expire.
    */
-  protected final int _messageTimeToLive;
+  private final int _messageTimeToLive;
 
   /**
    * The <code>CacheClientNotifier</code> registering this proxy.
    */
-  protected final CacheClientNotifier _cacheClientNotifier;
+  private final CacheClientNotifier cacheClientNotifier;
 
-  /**
-   * Defaults to true; meaning do some logging of dropped client notification messages. Set the
-   * system property to true to cause dropped messages to NOT be logged.
-   */
-  protected static final boolean LOG_DROPPED_MSGS =
-      !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings");
+  private final Object clientUserAuthsLock = new Object();
 
   /**
-   * for testing purposes, delays the start of the dispatcher thread
+   * The AcceptorImpl identifier to which the proxy is connected.
    */
-  public static boolean isSlowStartForTesting = false;
+  private final long _acceptorId;
 
-  /**
-   * Default value for slow starting time of dispatcher
-   */
-  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+  /** acceptor's setting for notifyBySubscription */
+  private final boolean notifyBySubscription;
+
+  private final Object queuedEventsSync = new Object();
 
   /**
-   * Key in the system property from which the slow starting time value will be retrieved
+   * A counter that keeps track of how many task iterations that have occurred since the last ping
+   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
+   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
    */
-  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+  private final AtomicInteger pingCounter = new AtomicInteger();
 
-  private boolean isPrimary;
+  private final Object drainLock = new Object();
 
-  /** @since GemFire 5.7 */
-  protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+  private final Object drainsInProgressLock = new Object();
+
+  private final SecurityService securityService;
 
   /**
-   * Flag to indicate whether to keep a durable client's queue alive
+   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
    */
-  boolean keepalive = false;
-
-  private AccessControl postAuthzCallback;
-  private Subject subject;
+  private volatile boolean isMarkedForRemoval = false;
 
   /**
-   * For multiuser environment..
+   * A thread that dispatches messages to the client
    */
-  private ClientUserAuths clientUserAuths;
+  private volatile MessageDispatcher _messageDispatcher;
 
-  private final Object clientUserAuthsLock = new Object();
+  /**
+   * Whether this dispatcher is paused
+   */
+  private volatile boolean _isPaused = true;
 
   /**
-   * The version of the client
+   * True if we are connected to a client.
    */
-  private Version clientVersion;
+  private volatile boolean connected = false;
 
   /**
    * A map of region name as key and integer as its value. Basically, it stores the names of the
@@ -278,42 +250,60 @@ public class CacheClientProxy implements ClientSession {
    */
   private volatile Map regionsWithEmptyDataPolicy = new HashMap();
 
+  /** To queue the events arriving during message dispatcher initialization */
+  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
+      new ConcurrentLinkedQueue<Conflatable>();
+
+  private volatile boolean messageDispatcherInit = false;
+
   /**
-   * A debug flag used for testing Backward compatibility
+   * The socket between the server and the client
    */
-  public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+  private Socket socket;
 
   /**
-   * Notify the region when a client interest registration occurs. This tells the region to update
-   * access time when an update is to be pushed to a client. It is enabled only for
-   * <code>PartitionedRegion</code>s currently.
+   * A communication buffer used by each message we send to the client
    */
-  protected static final boolean NOTIFY_REGION_ON_INTEREST =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
+  private ByteBuffer _commBuffer;
 
   /**
-   * The AcceptorImpl identifier to which the proxy is connected.
+   * The remote host's IP address string (cached for convenience)
    */
-  private final long _acceptorId;
+  private String _remoteHostAddress;
 
-  /** acceptor's setting for notifyBySubscription */
-  private final boolean notifyBySubscription;
+  /**
+   * The proxy id of the client represented by this proxy
+   */
+  private ClientProxyMembershipID proxyID;
 
-  /** To queue the events arriving during message dispatcher initialization */
-  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
-      new ConcurrentLinkedQueue<Conflatable>();
+  /**
+   * True if a marker message is still in the ha queue.
+   */
+  private boolean markerEnqueued = false;
 
-  private final Object queuedEventsSync = new Object();
+  private boolean isPrimary;
 
-  private volatile boolean messageDispatcherInit = false;
+  /** @since GemFire 5.7 */
+  private byte clientConflation = HandShake.CONFLATION_DEFAULT;
 
   /**
-   * A counter that keeps track of how many task iterations that have occurred since the last ping
-   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
-   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
+   * Flag to indicate whether to keep a durable client's queue alive
    */
-  private final AtomicInteger pingCounter = new AtomicInteger();
+  private boolean keepalive = false;
 
+  private AccessControl postAuthzCallback;
+
+  private Subject subject;
+
+  /**
+   * For multiuser environment..
+   */
+  private ClientUserAuths clientUserAuths;
+
+  /**
+   * The version of the client
+   */
+  private Version clientVersion;
 
   /** Date on which this instances was created */
   private Date creationDate;
@@ -321,52 +311,87 @@ public class CacheClientProxy implements ClientSession {
   /**
    * true when the durable client associated with this proxy is being restarted and prevents cqs
    * from being closed and drained
-   **/
+   */
   private boolean drainLocked = false;
-  private final Object drainLock = new Object();
 
   /** number of cq drains that are currently in progress **/
   private int numDrainsInProgress = 0;
-  private final Object drainsInProgressLock = new Object();
 
-  private SecurityService securityService = SecurityService.getSecurityService();
+  static CacheClientProxy createCacheClientProxy(final CacheClientNotifier ccn,
+                                                 final GemFireCacheImpl cache,
+                                                 final StatisticsFactory statsFactory,
+                                                 final SecurityService securityService,
+                                                 final Socket socket,
+                                                 final ClientProxyMembershipID proxyID,
+                                                 final boolean isPrimary,
+                                                 final byte clientConflation,
+                                                 final Version clientVersion,
+                                                 final long acceptorId,
+                                                 final boolean notifyBySubscription) {
+
+    CacheClientProxy cacheClientProxy = new CacheClientProxy(
+        ccn, cache, statsFactory, securityService, socket, proxyID, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
+
+    // Create the interest list
+    cacheClientProxy.cils[RegisterInterestTracker.interestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.proxyID);
+    // Create the durable interest list
+    cacheClientProxy.cils[RegisterInterestTracker.durableInterestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.getDurableId());
+
+    return cacheClientProxy;
+  }
 
   /**
    * Constructor.
    *
    * @param ccn The <code>CacheClientNotifier</code> registering this proxy
+   * @param cache
    * @param socket The socket between the server and the client
    * @param proxyID representing the Connection Proxy of the clien
    * @param isPrimary The boolean stating whether this prozxy is primary
-   * @throws CacheException {
-   */
-  protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
-      ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
-      Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException {
+   * @param clientConflation
+   * @param clientVersion
+   */
+  private CacheClientProxy(final CacheClientNotifier ccn,
+                           final GemFireCacheImpl cache,
+                           final StatisticsFactory statsFactory,
+                           final SecurityService securityService,
+                           final Socket socket,
+                           final ClientProxyMembershipID proxyID,
+                           final boolean isPrimary,
+                           final byte clientConflation,
+                           final Version clientVersion,
+                           final long acceptorId,
+                           final boolean notifyBySubscription)
+      throws CacheException {
     initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
-    this._cacheClientNotifier = ccn;
-    this._cache = (GemFireCacheImpl) ccn.getCache();
+    this.cacheClientNotifier = ccn;
+    this.cache = cache;
+    this.securityService = securityService;
     this._maximumMessageCount = ccn.getMaximumMessageCount();
     this._messageTimeToLive = ccn.getMessageTimeToLive();
     this._acceptorId = acceptorId;
     this.notifyBySubscription = notifyBySubscription;
-    StatisticsFactory factory = this._cache.getDistributedSystem();
+
     this._statistics =
-        new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
-            + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
+        new CacheClientProxyStats(statsFactory, "id_" + this.proxyID.getDistributedMember().getId()
+            + "_at_" + this._remoteHostAddress + ":" + this.socket.getPort());
 
-    // Create the interest list
-    this.cils[RegisterInterestTracker.interestListIndex] =
-        new ClientInterestList(this, this.proxyID);
-    // Create the durable interest list
-    this.cils[RegisterInterestTracker.durableInterestListIndex] =
-        new ClientInterestList(this, this.getDurableId());
     this.postAuthzCallback = null;
-    this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+    this.cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
     this.creationDate = new Date();
     initializeClientAuths();
   }
 
+  boolean isClientConflationOn() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
+  boolean isClientConflationDefault() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
   private void initializeClientAuths() {
     if (AcceptorImpl.isPostAuthzCallbackPresent())
       this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
@@ -411,13 +436,13 @@ public class CacheClientProxy implements ClientSession {
 
   private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
       byte cc, Version vers) {
-    this._socket = socket;
+    this.socket = socket;
     this.proxyID = pid;
     this.connected = true;
     {
       int bufSize = 1024;
       try {
-        bufSize = _socket.getSendBufferSize();
+        bufSize = this.socket.getSendBufferSize();
         if (bufSize < 1024) {
           bufSize = 1024;
         }
@@ -450,7 +475,6 @@ public class CacheClientProxy implements ClientSession {
     return this.notifyBySubscription;
   }
 
-
   /**
    * Returns the DistributedMember represented by this proxy
    */
@@ -458,47 +482,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -522,11 +505,11 @@ public class CacheClientProxy implements ClientSession {
    * @return the socket between the server and the client
    */
   protected Socket getSocket() {
-    return this._socket;
+    return this.socket;
   }
 
   public String getSocketHost() {
-    return this._socket.getInetAddress().getHostAddress();
+    return this.socket.getInetAddress().getHostAddress();
   }
 
   protected ByteBuffer getCommBuffer() {
@@ -548,7 +531,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the remote host's port
    */
   public int getRemotePort() {
-    return this._socket.getPort();
+    return this.socket.getPort();
   }
 
   /**
@@ -593,7 +576,7 @@ public class CacheClientProxy implements ClientSession {
             this.isMarkedForRemovalLock.wait();
           } catch (InterruptedException e) {
             interrupted = true;
-            this._cache.getCancelCriterion().checkCancelInProgress(e);
+            this.cache.getCancelCriterion().checkCancelInProgress(e);
           }
         } // while
       } finally {
@@ -621,7 +604,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the GemFire cache
    */
   public GemFireCacheImpl getCache() {
-    return this._cache;
+    return this.cache;
   }
 
   public Set<String> getInterestRegisteredRegions() {
@@ -649,7 +632,7 @@ public class CacheClientProxy implements ClientSession {
    * @return this proxy's <code>CacheClientNotifier</code>
    */
   protected CacheClientNotifier getCacheClientNotifier() {
-    return this._cacheClientNotifier;
+    return this.cacheClientNotifier;
   }
 
   /**
@@ -852,8 +835,8 @@ public class CacheClientProxy implements ClientSession {
         }
       }
     } catch (Exception ex) {
-      if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
-        this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
+      if (this.cache.getSecurityLoggerI18n().warningEnabled()) {
+        this.cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
             new Object[] {this, ex});
       }
     }
@@ -991,9 +974,9 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void closeSocket() {
-    if (this._socketClosed.compareAndSet(false, true)) {
+    if (this.socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+      this.cacheClientNotifier.getSocketCloser().asyncClose(this.socket, this._remoteHostAddress,
           null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
@@ -1008,7 +991,7 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+        this.cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }
@@ -1124,7 +1107,7 @@ public class CacheClientProxy implements ClientSession {
       InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
     // Create a client interest message for the keyOfInterest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER);
 
     // Notify all secondary proxies of a change in interest
@@ -1146,7 +1129,7 @@ public class CacheClientProxy implements ClientSession {
       String regionName, Object keyOfInterest) {
     // Get the initial value
     Get70 request = (Get70) Get70.getCommand();
-    LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+    LocalRegion lr = (LocalRegion) this.cache.getRegion(regionName);
     Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null);
     boolean isObject = entry.isObject;
     byte[] value = null;
@@ -1170,7 +1153,7 @@ public class CacheClientProxy implements ClientSession {
       EventID eventId = null;
       if (clientInterestMessage == null) {
         // If the clientInterestMessage is null, create a new event id
-        eventId = new EventID(this._cache.getDistributedSystem());
+        eventId = new EventID(this.cache.getDistributedSystem());
       } else {
         // If the clientInterestMessage is not null, base the event id off its event id to fix
         // GEM-794.
@@ -1239,7 +1222,7 @@ public class CacheClientProxy implements ClientSession {
       boolean isDurable, boolean receiveValues, int interestType) {
     // Notify all secondary proxies of a change in interest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         (byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER);
     notifySecondariesOfInterestChange(message);
 
@@ -1269,17 +1252,9 @@ public class CacheClientProxy implements ClientSession {
           .append("->").append(InterestType.getString(message.getInterestType()));
       logger.debug(buffer.toString());
     }
-    this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+    this.cacheClientNotifier.deliverInterestChange(this.proxyID, message);
   }
 
-  /*
-   * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try {
-   * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch
-   * (RegionDestroyedException e) {
-   * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" +
-   * keyOfInterest, e); } }
-   */
-
   /**
    * Registers interest in the input region name and key
    *
@@ -1293,7 +1268,7 @@ public class CacheClientProxy implements ClientSession {
     cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
     if (flushState) {
       flushForInterestRegistration(regionName,
-          this._cache.getDistributedSystem().getDistributedMember());
+          this.cache.getDistributedSystem().getDistributedMember());
     }
     HARegionQueue queue = getHARegionQueue();
     if (queue != null) { // queue is null during initialization
@@ -1306,7 +1281,7 @@ public class CacheClientProxy implements ClientSession {
    * interest. During queue creation it is the queue's image provider.
    */
   public void flushForInterestRegistration(String regionName, DistributedMember target) {
-    Region r = this._cache.getRegion(regionName);
+    Region r = this.cache.getRegion(regionName);
     if (r == null) {
       if (logger.isDebugEnabled()) {
         logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
@@ -1320,7 +1295,7 @@ public class CacheClientProxy implements ClientSession {
       if (r instanceof PartitionedRegion) {
         // need to flush all buckets. SFO should be changed to target buckets
         // belonging to a particular PR, but it doesn't have that option right now
-        sfo = new StateFlushOperation(this._cache.getDistributedSystem().getDistributionManager());
+        sfo = new StateFlushOperation(this.cache.getDistributedSystem().getDistributionManager());
       } else {
         sfo = new StateFlushOperation((DistributedRegion) r);
       }
@@ -1378,7 +1353,7 @@ public class CacheClientProxy implements ClientSession {
     if (getHARegionQueue() != null) {
       if (flushState) {
         flushForInterestRegistration(regionName,
-            this._cache.getDistributedSystem().getDistributedMember());
+            this.cache.getDistributedSystem().getDistributedMember());
       }
       getHARegionQueue().setHasRegisteredInterest(true);
     }
@@ -1643,7 +1618,7 @@ public class CacheClientProxy implements ClientSession {
     if (logger.isDebugEnabled()) {
       logger.debug("About to send message directly to {}", this);
     }
-    if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+    if (this._messageDispatcher != null && this.socket != null && !this.socket.isClosed()) {
       // If the socket is open, send the message to it
       this._messageDispatcher.sendMessageDirectly(message);
       if (logger.isDebugEnabled()) {
@@ -1759,7 +1734,7 @@ public class CacheClientProxy implements ClientSession {
     if (this.isPrimary) {
       // Add the marker to the queue
       if (!processedMarker) {
-        EventID eventId = new EventID(this._cache.getDistributedSystem());
+        EventID eventId = new EventID(this.cache.getDistributedSystem());
         this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
       }
 
@@ -1811,13 +1786,8 @@ public class CacheClientProxy implements ClientSession {
   public String toString() {
     StringBuffer buffer = new StringBuffer();
     buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
         .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
+        .append("; port=").append(this.socket.getPort()).append("; primary=").append(isPrimary)
         .append("; version=").append(clientVersion).append("]");
     return buffer.toString();
   }
@@ -1825,13 +1795,8 @@ public class CacheClientProxy implements ClientSession {
   public String getState() {
     StringBuffer buffer = new StringBuffer();
     buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
         .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
+        .append("; port=").append(this.socket.getPort()).append("; primary=").append(isPrimary)
         .append("; version=").append(clientVersion).append("; paused=").append(isPaused())
         .append("; alive=").append(isAlive()).append("; connected=").append(isConnected())
         .append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]");
@@ -1844,15 +1809,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public boolean isPrimary() {
-    // boolean primary = this._messageDispatcher.isAlive()
-    // || this._messageDispatcher._messageQueue.isPrimary();
-    boolean primary = this.isPrimary;
-    // System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
-    // System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " +
-    // this._messageDispatcher._messageQueue.isPrimary());
-    // System.out.println(this + ": IS PRIMARY: " + primary);
-    return primary;
-    // return this.isPrimary ;
+    return this.isPrimary;
   }
 
   protected boolean basicIsPrimary() {
@@ -1863,13 +1820,7 @@ public class CacheClientProxy implements ClientSession {
     this.isPrimary = isPrimary;
   }
 
-  // private static int nextId = 0;
-  // static protected int getNextId() {
-  // synchronized (CacheClientProxy.class) {
-  // return ++nextId;
-  // }
-  // }
-  /*
+  /**
    * Return this client's HA region queue
    * 
    * @returns - HARegionQueue of the client
@@ -1881,7 +1832,6 @@ public class CacheClientProxy implements ClientSession {
     return null;
   }
 
-
   /**
    * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
    * 
@@ -1952,7 +1902,7 @@ public class CacheClientProxy implements ClientSession {
 
         // Close the proxy
         terminateDispatching(false);
-        _cacheClientNotifier._statistics.incQueueDroppedCount();
+        cacheClientNotifier._statistics.incQueueDroppedCount();
 
         /**
          * Setting the expiration task to null again and cancelling existing one, if any. See
@@ -1976,7 +1926,7 @@ public class CacheClientProxy implements ClientSession {
 
     };
     if (this._durableExpirationTask.compareAndSet(null, task)) {
-      _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
+      cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
     }
   }
 
@@ -1992,11 +1942,131 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
+  public static void setTestHook(TestHook value) {
+    testHook = value;
+  }
+
+  public static void unsetTestHook() {
+    testHook = null;
+  }
+
+  public static TestHook getTestHook() {
+    return testHook;
+  }
+
+  static void setSlowStartForTesting() {
+    isSlowStartForTesting = true;
+  }
+
+  static void unsetSlowStartForTesting() {
+    isSlowStartForTesting = false;
+  }
+
+  static void setAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = true;
+  }
+
+  static void unsetAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = false;
+  }
+
+  Socket getSocketForTesting() {
+    return this.socket;
+  }
+
+  ClientInterestList[] getClientInterestListForTesting() {
+    return this.cils;
+  }
+
+  MessageDispatcher getMessageDispatcherForTesting() {
+    return this._messageDispatcher;
+  }
+
+  /**
+   * Returns the current number of CQS the client installed.
+   *
+   * @return int the current count of CQs for this client
+   */
+  public int getCqCount() {
+    synchronized (this) {
+      return this._statistics.getCqCount();
+    }
+  }
+
+  /**
+   * Increment the number of CQs the client installed
+   *
+   */
+  public void incCqCount() {
+    synchronized (this) {
+      this._statistics.incCqCount();
+    }
+  }
+
+  /**
+   * Decrement the number of CQs the client installed
+   *
+   */
+  public synchronized void decCqCount() {
+    synchronized (this) {
+      this._statistics.decCqCount();
+    }
+  }
+
+  /**
+   * Returns true if the client has one CQ
+   *
+   * @return true if the client has one CQ
+   */
+  public boolean hasOneCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 1;
+    }
+  }
+
+  /**
+   * Returns true if the client has no CQs
+   *
+   * @return true if the client has no CQs
+   */
+  public boolean hasNoCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 0;
+    }
+  }
+
+  /**
+   * Get map of regions with empty data policy
+   *
+   * @since GemFire 6.1
+   */
+  public Map getRegionsWithEmptyDataPolicy() {
+    return regionsWithEmptyDataPolicy;
+  }
+
+  public int incrementAndGetPingCounter() {
+    int pingCount = this.pingCounter.incrementAndGet();
+    return pingCount;
+  }
+
+  public void resetPingCounter() {
+    this.pingCounter.set(0);
+  }
+
+  /**
+   * Returns the number of seconds that have elapsed since the Client proxy created.
+   *
+   * @since GemFire 7.0
+   */
+  public long getUpTime() {
+    return (System.currentTimeMillis() - this.creationDate.getTime()) / 1000;
+  }
+
   /**
    * Class <code>ClientInterestList</code> provides a convenient interface for manipulating client
    * interest information.
    */
-  static protected class ClientInterestList {
+  static class ClientInterestList {
 
     final CacheClientProxy ccp;
 
@@ -2031,7 +2101,7 @@ public class CacheClientProxy implements ClientSession {
       }
       Set keysRegistered = null;
       synchronized (this.interestListLock) {
-        LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true);
+        LocalRegion r = (LocalRegion) this.ccp.cache.getRegion(regionName, true);
         if (r == null) {
           throw new RegionDestroyedException("Region could not be found for interest registration",
               regionName);
@@ -2055,7 +2125,7 @@ public class CacheClientProxy implements ClientSession {
 
     protected FilterProfile getProfile(String regionName) {
       try {
-        return this.ccp._cache.getFilterProfile(regionName);
+        return this.ccp.cache.getFilterProfile(regionName);
       } catch (CancelException e) {
         return null;
       }
@@ -2221,7 +2291,6 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-
   /**
    * Class <code>MessageDispatcher</code> is a <code>Thread</code> that processes messages bound for
    * the client by taking messsages from the message queue and sending them to the client over the
@@ -2234,34 +2303,17 @@ public class CacheClientProxy implements ClientSession {
      */
     protected final HARegionQueue _messageQueue;
 
-    // /**
-    // * An int used to keep track of the number of messages dropped for logging
-    // * purposes. If greater than zero then a warning has been logged about
-    // * messages being dropped.
-    // */
-    // private int _numberOfMessagesDropped = 0;
-
     /**
      * The proxy for which this dispatcher is processing messages
      */
     private final CacheClientProxy _proxy;
 
-    // /**
-    // * The conflator faciliates message conflation
-    // */
-    // protected BridgeEventConflator _eventConflator;
-
     /**
      * Whether the dispatcher is stopped
      */
     private volatile boolean _isStopped = true;
 
     /**
-     * guarded.By _pausedLock
-     */
-    // boolean _isPausedDispatcher = false;
-
-    /**
      * A lock object used to control pausing this dispatcher
      */
     protected final Object _pausedLock = new Object();
@@ -2274,11 +2326,6 @@ public class CacheClientProxy implements ClientSession {
     private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
 
     private final Lock socketWriteLock = socketLock.writeLock();
-    // /**
-    // * A boolean verifying whether a warning has already been issued if the
-    // * message queue has reached its capacity.
-    // */
-    // private boolean _messageQueueCapacityReachedWarning = false;
 
     /**
      * Constructor.
@@ -2303,7 +2350,7 @@ public class CacheClientProxy implements ClientSession {
         HARegionQueueAttributes harq = new HARegionQueueAttributes();
         harq.setBlockingQueueCapacity(proxy._maximumMessageCount);
         harq.setExpiryTime(proxy._messageTimeToLive);
-        ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
+        ((HAContainerWrapper) proxy.cacheClientNotifier.getHaContainer())
             .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
         boolean createDurableQueue = proxy.proxyID.isDurable();
         boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
@@ -2314,7 +2361,7 @@ public class CacheClientProxy implements ClientSession {
         }
         this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
             getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue,
-            proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
+            proxy.cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
             this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta);
         // Check if interests were registered during HARegion GII.
         if (this._proxy.hasRegisteredInterested()) {
@@ -2405,10 +2452,6 @@ public class CacheClientProxy implements ClientSession {
             Thread.sleep(500);
           } catch (InterruptedException e) {
             interrupted = true;
-            /*
-             * GemFireCache c = (GemFireCache)_cache;
-             * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);
-             */
           } catch (CancelException e) {
             break;
           } catch (CacheException e) {
@@ -2503,7 +2546,7 @@ public class CacheClientProxy implements ClientSession {
       ClientMessage clientMessage = null;
       while (!isStopped()) {
         // SystemFailure.checkFailure(); DM's stopper does this
-        if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
+        if (this._proxy.cache.getCancelCriterion().isCancelInProgress()) {
           break;
         }
         try {
@@ -2752,9 +2795,6 @@ public class CacheClientProxy implements ClientSession {
       }
       Message message = null;
 
-      // byte[] latestValue =
-      // this._eventConflator.getLatestValue(clientMessage);
-
       if (clientMessage instanceof ClientUpdateMessage) {
         byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
         if (logger.isTraceEnabled()) {
@@ -2771,7 +2811,7 @@ public class CacheClientProxy implements ClientSession {
 
         message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue);
 
-        if (AFTER_MESSAGE_CREATION_FLAG) {
+        if (afterMessageCreationForTesting) {
           ClientServerObserver bo = ClientServerObserverHolder.getInstance();
           bo.afterMessageCreation(message);
         }
@@ -2779,37 +2819,9 @@ public class CacheClientProxy implements ClientSession {
         message = clientMessage.getMessage(getProxy(), true /* notify */);
       }
 
-      // //////////////////////////////
-      // TEST CODE BEGIN (Throws exception to test closing proxy)
-      // if (true) throw new IOException("test");
-      // TEST CODE END
-      // //////////////////////////////
-      // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-      // latestValue);
-      // Message message = clientMessage.getMessage(); removed during merge.
-      // BugFix for BUG#38206 and BUG#37791
       if (!this._proxy.isPaused()) {
         sendMessage(message);
 
-        // //////////////////////////////
-        // TEST CODE BEGIN (Throws exception to test closing proxy)
-        // if (true) throw new IOException("test");
-        // TEST CODE END
-        // //////////////////////////////
-        // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-        // latestValue);
-        // Message message = clientMessage.getMessage(); removed during merge.
-        // message.setComms(getSocket(), getCommBuffer(), getStatistics());
-        // message.send();
-
-        // //////////////////////////////
-        // TEST CODE BEGIN (Introduces random wait in client)
-        // Sleep a random number of ms
-        // java.util.Random rand = new java.util.Random();
-        // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {}
-        // TEST CODE END
-        // //////////////////////////////
-
         if (logger.isTraceEnabled()) {
           logger.trace("{}: Dispatched {}", this, clientMessage);
         }
@@ -2851,7 +2863,7 @@ public class CacheClientProxy implements ClientSession {
       try {
         this._messageQueue.put(clientMessage);
         if (this._proxy.isPaused() && this._proxy.isDurable()) {
-          this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
+          this._proxy.cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
           if (logger.isDebugEnabled()) {
             logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
           }
@@ -2955,7 +2967,7 @@ public class CacheClientProxy implements ClientSession {
       this._pausedLock.notifyAll();
     }
 
-    protected Object deserialize(byte[] serializedBytes) {
+    private Object deserialize(byte[] serializedBytes) {
       Object deserializedObject = serializedBytes;
       // This is a debugging method so ignore all exceptions like
       // ClassNotFoundException
@@ -2979,89 +2991,7 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  /**
-   * Returns the current number of CQS the client installed.
-   *
-   * @return int the current count of CQs for this client
-   */
-  public int getCqCount() {
-    synchronized (this) {
-      return this._statistics.getCqCount();
-    }
-  }
-
-  /**
-   * Increment the number of CQs the client installed
-   *
-   */
-  public void incCqCount() {
-    synchronized (this) {
-      this._statistics.incCqCount();
-    }
-  }
-
-  /**
-   * Decrement the number of CQs the client installed
-   *
-   */
-  public synchronized void decCqCount() {
-    synchronized (this) {
-      this._statistics.decCqCount();
-    }
-  }
-
-  /**
-   * Returns true if the client has one CQ
-   *
-   * @return true if the client has one CQ
-   */
-  public boolean hasOneCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 1;
-    }
-  }
-
-  /**
-   * Returns true if the client has no CQs
-   *
-   * @return true if the client has no CQs
-   */
-  public boolean hasNoCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 0;
-    }
-  }
-
-  /**
-   * Get map of regions with empty data policy
-   *
-   * @since GemFire 6.1
-   */
-  public Map getRegionsWithEmptyDataPolicy() {
-    return regionsWithEmptyDataPolicy;
-  }
-
-  public int incrementAndGetPingCounter() {
-    int pingCount = this.pingCounter.incrementAndGet();
-    return pingCount;
-  }
-
-  public void resetPingCounter() {
-    this.pingCounter.set(0);
-  }
-
-  /**
-   * Returns the number of seconds that have elapsed since the Client proxy created.
-   * 
-   * @since GemFire 7.0
-   */
-  public long getUpTime() {
-    return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
-  }
-
   public interface TestHook {
-    public void doTestHook(String spot);
+    void doTestHook(String spot);
   }
-
-  public static TestHook testHook;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e21a834..6e8f9ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -22,7 +22,11 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CacheClientStatus;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.IncomingGatewayStatus;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -32,7 +36,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.logging.log4j.Logger;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 6bbe7b8..7d1603d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -352,8 +352,8 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     byte[] serializedValue = null;
     Message message = null;
     boolean conflation = false;
-    conflation = (proxy.clientConflation == HandShake.CONFLATION_ON)
-        || (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this.shouldBeConflated());
+    conflation = (proxy.isClientConflationOn())
+        || (proxy.isClientConflationDefault() && this.shouldBeConflated());
 
     if (latestValue != null) {
       serializedValue = latestValue;

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 6e119c0..d43244a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -62,6 +62,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireException;
+import org.apache.geode.LogWriter;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
@@ -1669,8 +1670,7 @@ public class HandShake implements ClientHandShake {
    * not
    */
   public static Object verifyCredentials(String authenticatorMethod, Properties credentials,
-      Properties securityProperties, InternalLogWriter logWriter,
-      InternalLogWriter securityLogWriter, DistributedMember member)
+      Properties securityProperties, LogWriter logWriter, LogWriter securityLogWriter, DistributedMember member)
       throws AuthenticationRequiredException, AuthenticationFailedException {
 
     if (!AcceptorImpl.isAuthenticationRequired()) {
@@ -1702,8 +1702,7 @@ public class HandShake implements ClientHandShake {
 
     String methodName = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     return verifyCredentials(methodName, this.credentials, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), this.id.getDistributedMember());
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(), this.id.getDistributedMember());
   }
 
   public void sendCredentialsForWan(OutputStream out, InputStream in) {
@@ -1731,8 +1730,7 @@ public class HandShake implements ClientHandShake {
     String authenticator = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     Properties peerWanProps = readCredentials(dis, dos, this.system);
     verifyCredentials(authenticator, peerWanProps, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), member);
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(), member);
   }
 
   private static int getKeySize(String skAlgo) {

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
index 1f8a564..34e6020 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
@@ -152,6 +152,16 @@ public class LogService extends LogManager {
   }
 
   /**
+   * Returns a Logger with the name of @{link SECURITY_LOGGER_NAME}.
+   *
+   * @return The security Logger.
+   */
+  public static Logger getSecurityLogger() {
+    return new FastLogger(
+        LogManager.getLogger(SECURITY_LOGGER_NAME, GemFireParameterizedMessageFactory.INSTANCE));
+  }
+
+  /**
    * Returns a LogWriterLogger that is decorated with the LogWriter and LogWriterI18n methods.
    * <p>
    * This is the bridge to LogWriter and LogWriterI18n that we need to eventually stop using in

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7aa11b7..6cb28ee 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -14,21 +14,14 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
@@ -39,7 +32,6 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.BindException;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -85,7 +77,7 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
       } catch (IllegalArgumentException expected) {
       }
@@ -95,7 +87,7 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST,
-            CacheServer.DEFAULT_TCP_NO_DELAY);
+            CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns of zero");
       } catch (IllegalArgumentException expected) {
       }
@@ -105,12 +97,12 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         fail("Expecetd a BindException while attaching to the same port");
       } catch (BindException expected) {
       }
@@ -119,7 +111,7 @@ public class AcceptorImplJUnitTest {
           CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
-          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
       assertEquals(port2, a3.getPort());
       InternalDistributedSystem isystem =
           (InternalDistributedSystem) this.cache.getDistributedSystem();

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
new file mode 100644
index 0000000..a61f790
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+@Category(UnitTest.class)
+public class AcceptorImplTest {
+
+  @Before
+  public void before() throws Exception {
+    DistributionConfigImpl distributionConfig = new DistributionConfigImpl(new Properties());
+    SocketCreatorFactory.setDistributionConfig(distributionConfig);
+  }
+
+  @After
+  public void after() throws Exception {
+    SocketCreatorFactory.close();
+  }
+
+  @Test
+  public void constructWithDefaults() throws Exception {
+    /*
+    Problems:
+
+        this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
+        messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
+
+        this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+        this.clientNotifier.getStats());
+
+        LoggingThreadGroup / ThreadFactory / ThreadPoolExecutor
+
+            isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+         isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+
+        String postAuthzFactoryName =
+            this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+     */
+
+    int port = 0;
+    String bindHostName = SocketCreator.getLocalHost().getHostName();
+    boolean notifyBySubscription = false;
+    int socketBufferSize = 1;
+    int maximumTimeBetweenPings = 0;
+    InternalCache internalCache = null;
+    int maxConnections = 0;
+    int maxThreads = 0;
+    int maximumMessageCount = 0;
+    int messageTimeToLive = 0;
+    ConnectionListener listener = null;
+    List overflowAttributesList = null;
+    boolean isGatewayReceiver = false;
+    List<GatewayTransportFilter> transportFilter = Collections.emptyList();
+    boolean tcpNoDelay = false;
+    CancelCriterion cancelCriterion = null;
+
+    AcceptorImpl acceptor = new AcceptorImpl(
+        port,
+        bindHostName,
+        notifyBySubscription,
+        socketBufferSize,
+        maximumTimeBetweenPings,
+        internalCache,
+        maxConnections,
+        maxThreads,
+        maximumMessageCount,
+        messageTimeToLive,
+        listener,
+        overflowAttributesList,
+        isGatewayReceiver,
+        transportFilter,
+        tcpNoDelay,
+        cancelCriterion
+    );
+
+    assertThat(acceptor).isNotNull();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
index 31f67aa..f4a8cc8 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
@@ -99,7 +99,7 @@ public class ClientConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void setIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", "15000");
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
index 1a76daa..efc0367 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -166,7 +166,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void installObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+    CacheClientProxy.setAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new DelaySendingEvent());
   }
 
@@ -176,7 +176,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void cleanupObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+    CacheClientProxy.unsetAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index b4f3185..5b340d1 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -62,13 +62,9 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
@@ -1014,7 +1010,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
       while (iter_prox.hasNext()) {
         CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
         // CCP should not contain region1
-        Set akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions;
+        Set akr = ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].regions;
         assertNotNull(akr);
         assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1));
         // CCP should contain region2
@@ -1352,7 +1348,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
    *
    */
   public static void unsetSlowDispatcherFlag() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
index b1e16ee..275e458 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
@@ -112,7 +112,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    * 
    */
   public static void setIsSlowStart(String milis) {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", milis);
   }
 
@@ -121,7 +121,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void unsetIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
index 544f732..07e2220 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -459,7 +459,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
         iter_prox = ccn.getClientProxies().iterator();
         if (iter_prox.hasNext()) {
           proxy = (CacheClientProxy) iter_prox.next();
-          return proxy._messageDispatcher.isAlive();
+          return proxy.getMessageDispatcherForTesting().isAlive();
         } else {
           return false;
         }
@@ -510,7 +510,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
     if (iter_prox.hasNext()) {
       CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
       assertFalse("Dispatcher on secondary should not be alive",
-          proxy._messageDispatcher.isAlive());
+          proxy.getMessageDispatcherForTesting().isAlive());
     }
   }
 
@@ -818,7 +818,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+          Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
               .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null && keysMap.size() == 2;
         }
@@ -830,7 +830,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+      Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
           .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(2, keysMap.size());
@@ -879,7 +879,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+          Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
               .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null;
         }
@@ -891,7 +891,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+      Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
           .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(1, keysMap.size());

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
index 6aea509..3585c3e 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
@@ -471,7 +471,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -529,7 +529,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
     } catch (Exception ex) {
       fail("while setting verifyDispatcherIsNotAlive  " + ex);

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
index 041cd38..e9982b2 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
@@ -434,7 +434,7 @@ public class InterestListRecoveryDUnitTest extends JUnit4DistributedTestCase {
   public static Set getKeysOfInterestMap(CacheClientProxy proxy, String regionName) {
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]);
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]._keysOfInterest);
-    return proxy.cils[RegisterInterestTracker.interestListIndex].getProfile(regionName)
+    return proxy.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].getProfile(regionName)
         .getKeysOfInterestFor(proxy.getProxyID());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
index 4a98298..826bba9 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
@@ -189,10 +189,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            if (proxy._messageDispatcher == null) {
+            if (proxy.getMessageDispatcherForTesting() == null) {
               return false;
             }
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -245,7 +245,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
 
     } catch (Exception ex) {
@@ -427,7 +427,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+            Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
                 .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
             if (keysMap == null) {
               excuse = "keys of interest is null";
@@ -446,7 +446,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
         };
         Wait.waitForCriterion(wc, 180 * 1000, 2 * 1000, true);
 
-        Set keysMap = ccp.cils[RegisterInterestTracker.interestListIndex]
+        Set keysMap = ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
             .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
         assertTrue(keysMap.contains(k1));
         assertTrue(keysMap.contains(k2));

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
new file mode 100644
index 0000000..485ee5e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.SystemFailure.loadEmergencyClasses;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class Put65BenchTest {
+
+  public Command put65Command;
+  public ServerConnection mockServerConnection;
+  public Message mockMessage;
+
+  @Before
+  public void setup() throws Exception {
+    loadEmergencyClasses();
+
+    this.put65Command = Put65.getCommand();
+
+    this.mockServerConnection = mock(ServerConnection.class,
+        withSettings().name("mockServerConnection"));
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache"));
+    when(this.mockServerConnection.getCache()).thenReturn(mockCache);
+
+    TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager"));
+    when(mockCache.getTxManager()).thenReturn(mockTxManager);
+
+    CacheServerStats mockCacheServerStats =
+        mock(CacheServerStats.class, withSettings().name("mockCacheServerStats"));
+    when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats);
+
+    ClientProxyMembershipID mockProxyId =
+        mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+    Message mockErrorResponseMessage =
+        mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+    when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(mockErrorResponseMessage);
+
+    Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+    when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
+
+    Part mockOperationPart = mock(Part.class, withSettings().name("mockOperationPart"));
+    when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part mockFlagsPart = mock(Part.class, withSettings().name("mockFlagsPart"));
+    when(mockFlagsPart.getInt()).thenReturn(0);
+
+    Part mockKeyPart = mock(Part.class, withSettings().name("mockKeyPart"));
+    when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+    when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
+
+    Part mockIsDeltaPart = mock(Part.class, withSettings().name("mockIsDeltaPart"));
+    when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part mockValuePart = mock(Part.class, withSettings().name("mockValuePart"));
+    when(mockValuePart.getObject()).thenReturn("mockValuePart");
+
+    Part mockEventPart = mock(Part.class, withSettings().name("mockEventPart"));
+    when(mockEventPart.getObject()).thenReturn("mockEventPart");
+
+    Part mockCallbackArgPart = mock(Part.class, withSettings().name("mockCallbackArgPart"));
+    when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
+
+    this.mockMessage = mock(Message.class, withSettings().name("mockMessage"));
+
+    when(this.mockMessage.getTransactionId()).thenReturn(NOTX);
+
+    when(this.mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+    when(this.mockMessage.getPart(1)).thenReturn(mockOperationPart);
+    when(this.mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+    when(this.mockMessage.getPart(3)).thenReturn(mockKeyPart);
+    when(this.mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+    when(this.mockMessage.getPart(5)).thenReturn(mockValuePart);
+    when(this.mockMessage.getPart(6)).thenReturn(mockEventPart);
+    when(this.mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
+  }
+
+  @Test
+  public void benchmark() {
+    this.put65Command.execute(this.mockMessage, this.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}


[2/4] geode git commit: WIP refactoring

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
new file mode 100644
index 0000000..3cc10e7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.ServerLauncher.Builder;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+@Category(IntegrationTest.class)
+public class Put65RealBenchTest {
+
+  private ServerConnection realServerConnection;
+
+  public Command put65Command;
+  public ServerConnection mockServerConnection;
+  public Message mockMessage;
+
+  private File workingDir;
+  private int serverPort;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    this.workingDir = temporaryFolder.getRoot();
+
+    this.serverPort = getRandomAvailablePort(SOCKET);
+
+    ServerLauncher serverLauncher = new ServerLauncher.Builder()
+        .setMemberName("server1")
+        .setRedirectOutput(true)
+        .setWorkingDirectory(this.workingDir.getAbsolutePath())
+        .set(MCAST_PORT, "0")
+        .set(LOCATORS, "")
+        .build();
+
+    serverLauncher.start();
+
+    Cache cache = getCache(serverLauncher);
+    CacheServer cacheServer = getCacheServer(cache);
+    AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+    this.realServerConnection = null;
+
+    this.mockServerConnection = mock(ServerConnection.class,
+        withSettings().name("mockServerConnection").spiedInstance(this.realServerConnection));
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    ClientProxyMembershipID mockProxyId =
+        mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+//    Message mockErrorResponseMessage =
+//        mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+//    when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(mockErrorResponseMessage);
+
+    Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+    when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
+
+    Part mockOperationPart = mock(Part.class, withSettings().name("mockOperationPart"));
+    when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part mockFlagsPart = mock(Part.class, withSettings().name("mockFlagsPart"));
+    when(mockFlagsPart.getInt()).thenReturn(0);
+
+    Part mockKeyPart = mock(Part.class, withSettings().name("mockKeyPart"));
+    when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+    when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
+
+    Part mockIsDeltaPart = mock(Part.class, withSettings().name("mockIsDeltaPart"));
+    when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part mockValuePart = mock(Part.class, withSettings().name("mockValuePart"));
+    when(mockValuePart.getObject()).thenReturn("mockValuePart");
+
+    Part mockEventPart = mock(Part.class, withSettings().name("mockEventPart"));
+    when(mockEventPart.getObject()).thenReturn("mockEventPart");
+
+    Part mockCallbackArgPart = mock(Part.class, withSettings().name("mockCallbackArgPart"));
+    when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
+
+    this.mockMessage = mock(Message.class, withSettings().name("mockMessage"));
+
+    when(this.mockMessage.getTransactionId()).thenReturn(NOTX);
+
+    when(this.mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+    when(this.mockMessage.getPart(1)).thenReturn(mockOperationPart);
+    when(this.mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+    when(this.mockMessage.getPart(3)).thenReturn(mockKeyPart);
+    when(this.mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+    when(this.mockMessage.getPart(5)).thenReturn(mockValuePart);
+    when(this.mockMessage.getPart(6)).thenReturn(mockEventPart);
+    when(this.mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
+
+    this.put65Command = Put65.getCommand();
+  }
+
+  @Test
+  public void benchmark() {
+    this.put65Command.execute(this.mockMessage, this.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
index 0a45494..a0b41b1 100644
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -2880,7 +2880,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
         public void run2() throws CacheException {
           // Set the Test Hook!
           // This test hook will pause during the drain process
-          CacheClientProxy.testHook = new RejectClientReconnectTestHook();
+          CacheClientProxy.setTestHook(new RejectClientReconnectTestHook());
         }
       });
 
@@ -2909,8 +2909,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
           WaitCriterion ev = new WaitCriterion() {
             @Override
             public boolean done() {
-              return CacheClientProxy.testHook != null
-                  && (((RejectClientReconnectTestHook) CacheClientProxy.testHook)
+              return CacheClientProxy.getTestHook() != null
+                  && (((RejectClientReconnectTestHook) CacheClientProxy.getTestHook())
                       .wasClientRejected());
             }
 
@@ -2921,7 +2921,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
           };
           Wait.waitForCriterion(ev, 10 * 1000, 200, true);
           assertTrue(
-              ((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+              ((RejectClientReconnectTestHook) CacheClientProxy.getTestHook()).wasClientRejected());
         }
       });
 
@@ -2958,7 +2958,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
       this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
         @Override
         public void run2() throws CacheException {
-          CacheClientProxy.testHook = null;
+          CacheClientProxy.unsetTestHook();
         }
       });
     }
@@ -3012,7 +3012,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
 
               // Set the Test Hook!
               // This test hook will pause during the drain process
-              CacheClientProxy.testHook = new CqExceptionDueToActivatingClientTestHook();
+              CacheClientProxy.setTestHook(new CqExceptionDueToActivatingClientTestHook());
 
               final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
               final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
@@ -3072,7 +3072,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
       this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
         @Override
         public void run2() throws CacheException {
-          CacheClientProxy.testHook = null;
+          CacheClientProxy.unsetTestHook();
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
index 5533376..5c2620f 100755
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -439,15 +439,15 @@ public class DurableClientTestCase extends JUnit4DistributedTestCase {
         // Find the proxy
         CacheClientProxy proxy = getClientProxy();
         assertNotNull(proxy);
-        assertNotNull(proxy._socket);
+        assertNotNull(proxy.getSocketForTesting());
         long end = System.currentTimeMillis() + 60000;
 
-        while (!proxy._socket.isClosed()) {
+        while (!proxy.getSocketForTesting().isClosed()) {
           if (System.currentTimeMillis() > end) {
             break;
           }
         }
-        assertTrue(proxy._socket.isClosed());
+        assertTrue(proxy.getSocketForTesting().isClosed());
       }
     });
 

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
index d548613..53995df 100755
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -151,19 +151,19 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
   }
 
   public static void setCacheClientProxyTestHook() {
-    CacheClientProxy.testHook = new CacheClientProxy.TestHook() {
+    CacheClientProxy.setTestHook(new CacheClientProxy.TestHook() {
       @Override
       public void doTestHook(String spot) {
         if (spot.equals("CLIENT_RECONNECTED")) {
           afterProxyReinitialized++;
         }
       }
-    };
+    });
   }
 
   public static void checkResultAndUnsetCacheClientProxyTestHook() {
     // Reinitialize only happened once
-    CacheClientProxy.testHook = null;
+    CacheClientProxy.unsetTestHook();
     assertEquals(1, afterProxyReinitialized);
     afterProxyReinitialized = 0;
   }


[4/4] geode git commit: WIP refactoring

Posted by kl...@apache.org.
WIP refactoring


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/135d674a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/135d674a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/135d674a

Branch: refs/heads/feature/GEODE-2632
Commit: 135d674ac2eb80089ba21279ed261bcf4fc2339f
Parents: 1881851
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 5 10:24:23 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 5 10:24:23 2017 -0700

----------------------------------------------------------------------
 .../sockets/command/ClientCachePutBench.java    |  16 +-
 .../cache/tier/sockets/command/Put65Bench.java  |  94 +--
 .../geode/internal/cache/CacheServerImpl.java   |   2 +-
 .../geode/internal/cache/tier/Acceptor.java     |   2 +-
 .../cache/tier/sockets/AcceptorImpl.java        |  50 +-
 .../cache/tier/sockets/CacheClientNotifier.java | 589 ++++------------
 .../cache/tier/sockets/CacheClientProxy.java    | 696 +++++++++----------
 .../cache/tier/sockets/ClientHealthMonitor.java |  15 +-
 .../tier/sockets/ClientUpdateMessageImpl.java   |   4 +-
 .../internal/cache/tier/sockets/HandShake.java  |  10 +-
 .../geode/internal/logging/LogService.java      |  10 +
 .../tier/sockets/AcceptorImplJUnitTest.java     |  18 +-
 .../cache/tier/sockets/AcceptorImplTest.java    | 112 +++
 .../tier/sockets/ClientConflationDUnitTest.java |   2 +-
 .../ClientServerForceInvalidateDUnitTest.java   |   4 +-
 .../tier/sockets/ClientServerMiscDUnitTest.java |   8 +-
 .../cache/tier/sockets/ConflationDUnitTest.java |   4 +-
 .../cache/tier/sockets/HAInterestTestCase.java  |  12 +-
 .../sockets/HAStartupAndFailoverDUnitTest.java  |   4 +-
 .../sockets/InterestListRecoveryDUnitTest.java  |   2 +-
 .../tier/sockets/RedundancyLevelTestBase.java   |  10 +-
 .../tier/sockets/command/Put65BenchTest.java    | 116 ++++
 .../sockets/command/Put65RealBenchTest.java     | 145 ++++
 .../sockets/DurableClientSimpleDUnitTest.java   |  14 +-
 .../tier/sockets/DurableClientTestCase.java     |   6 +-
 .../cache/wan/Simple2CacheServerDUnitTest.java  |   6 +-
 26 files changed, 959 insertions(+), 992 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
index a1cbd81..df51b78 100644
--- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
@@ -14,9 +14,12 @@
  */
 package org.apache.geode.internal.cache.tier.sockets.command;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.io.FileUtils.*;
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
 import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -24,6 +27,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.AbstractLauncher.Status;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -121,16 +125,22 @@ public class ClientCachePutBench {
       command.add(ServerLauncher.Command.START.getName());
       command.add("server1");
       command.add("--server-port=" + this.serverPort);
-      // command.add("--redirect-output");
+      // put65Command.add("--redirect-output");
 
       this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start();
 
-      boolean forever = true;
-      while (forever) {
+      boolean sleep = false;
+      while (sleep) {
         assertThat(this.process.isAlive()).isTrue();
         Thread.sleep(10000);
       }
 
+      ServerLauncher serverLauncher = new ServerLauncher.Builder()
+          .setWorkingDirectory(this.temporaryFolder.getRoot().getAbsolutePath()).build();
+
+      await().atMost(2, MINUTES)
+          .until(() -> assertThat(serverLauncher.status().getStatus()).isEqualTo(ONLINE));
+
       this.clientCache =
           new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create();
       this.region =

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
index 6ccd8c3..d393769 100644
--- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
@@ -16,10 +16,8 @@ package org.apache.geode.internal.cache.tier.sockets.command;
 
 import static org.apache.geode.SystemFailure.loadEmergencyClasses;
 import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
-import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -30,8 +28,6 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
@@ -42,78 +38,82 @@ public class Put65Bench {
 
   @State(Scope.Benchmark)
   public static class ServerConnectionState {
-    public Command command;
+    public Command put65Command;
     public ServerConnection mockServerConnection;
-    public Message message;
+    public Message mockMessage;
 
     @Setup(Level.Trial)
     public void setup() throws Exception {
       loadEmergencyClasses();
 
-      this.command = Put65.getCommand();
+      this.put65Command = Put65.getCommand();
 
-      this.mockServerConnection = mock(ServerConnection.class);
+      this.mockServerConnection = mock(ServerConnection.class,
+          withSettings().defaultAnswer(CALLS_REAL_METHODS).name("mockServerConnection"));
       when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
 
-      TXManagerImpl txManager = mock(TXManagerImpl.class);
-      GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
-      when(cache.getTxManager()).thenReturn(txManager);
+      GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache"));
+      when(this.mockServerConnection.getCache()).thenReturn(mockCache);
 
-      when(this.mockServerConnection.getCache()).thenReturn(cache);
+      TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager"));
+      when(mockCache.getTxManager()).thenReturn(mockTxManager);
 
-      CacheServerStats cacheServerStats = mock(CacheServerStats.class);
-      when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+      CacheServerStats mockCacheServerStats =
+          mock(CacheServerStats.class, withSettings().name("mockCacheServerStats"));
+      when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats);
 
-      // .getDistributedMember()
-      ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+      ClientProxyMembershipID mockProxyId =
+          mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
       when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
 
-      Message errorResponseMessage = mock(Message.class);
-      when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+      Message mockErrorResponseMessage =
+          mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+      when(this.mockServerConnection.getErrorResponseMessage())
+          .thenReturn(mockErrorResponseMessage);
 
-      Part regionNamePart = mock(Part.class);
-      when(regionNamePart.getString()).thenReturn("regionNamePart");
+      Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+      when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
 
-      Part operationPart = mock(Part.class);
-      when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+      Part mockOperationPart = mock(Part.class);
+      when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
 
-      Part flagsPart = mock(Part.class);
-      when(flagsPart.getInt()).thenReturn(0);
+      Part mockFlagsPart = mock(Part.class);
+      when(mockFlagsPart.getInt()).thenReturn(0);
 
-      Part keyPart = mock(Part.class);
-      when(keyPart.getObject()).thenReturn("keyPart");
-      when(keyPart.getStringOrObject()).thenReturn("keyPart");
+      Part mockKeyPart = mock(Part.class);
+      when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+      when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
 
-      Part isDeltaPart = mock(Part.class);
-      when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+      Part mockIsDeltaPart = mock(Part.class);
+      when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
 
-      Part valuePart = mock(Part.class);
-      when(valuePart.getObject()).thenReturn("valuePart");
+      Part mockValuePart = mock(Part.class);
+      when(mockValuePart.getObject()).thenReturn("mockValuePart");
 
-      Part eventPart = mock(Part.class);
-      when(eventPart.getObject()).thenReturn("eventPart");
+      Part mockEventPart = mock(Part.class);
+      when(mockEventPart.getObject()).thenReturn("mockEventPart");
 
-      Part callbackArgPart = mock(Part.class);
-      when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+      Part mockCallbackArgPart = mock(Part.class);
+      when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
 
-      message = mock(Message.class);
+      mockMessage = mock(Message.class);
 
-      when(message.getTransactionId()).thenReturn(NOTX);
+      when(mockMessage.getTransactionId()).thenReturn(NOTX);
 
-      when(message.getPart(0)).thenReturn(regionNamePart);
-      when(message.getPart(1)).thenReturn(operationPart);
-      when(message.getPart(2)).thenReturn(flagsPart);
-      when(message.getPart(3)).thenReturn(keyPart);
-      when(message.getPart(4)).thenReturn(isDeltaPart);
-      when(message.getPart(5)).thenReturn(valuePart);
-      when(message.getPart(6)).thenReturn(eventPart);
-      when(message.getPart(7)).thenReturn(callbackArgPart);
+      when(mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+      when(mockMessage.getPart(1)).thenReturn(mockOperationPart);
+      when(mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+      when(mockMessage.getPart(3)).thenReturn(mockKeyPart);
+      when(mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+      when(mockMessage.getPart(5)).thenReturn(mockValuePart);
+      when(mockMessage.getPart(6)).thenReturn(mockEventPart);
+      when(mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
     }
   }
 
   // @Benchmark
   public void benchmark(ServerConnectionState state, Blackhole blackhole) {
-    state.command.execute(state.message, state.mockServerConnection);
+    state.put65Command.execute(state.mockMessage, state.mockServerConnection);
     // Message replyMessage = state.mockServerConnection.getReplyMessage();
     // blackhole.consume(replyMessage);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index a3c4a93..2294fb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -317,7 +317,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
         getSocketBufferSize(), getMaximumTimeBetweenPings(), this.cache, getMaxConnections(),
         getMaxThreads(), getMaximumMessageCount(), getMessageTimeToLive(), this.loadMonitor,
         overflowAttributesList, this.isGatewayReceiver, this.gatewayTransportFilters,
-        this.tcpNoDelay);
+        this.tcpNoDelay, this.cache.getCancelCriterion());
 
     this.acceptor.start();
     this.advisor.handshake();

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..97dcba5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -25,7 +25,7 @@ import org.apache.geode.internal.Version;
  *
  * @since GemFire 2.0.2
  */
-public abstract class Acceptor {
+public interface Acceptor {
 
   // The following are communications "mode" bytes sent as the first byte of a
   // client/server handshake. They must not be larger than 1 byte

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ed29472..47749f8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -57,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLException;
 
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -97,7 +100,7 @@ import org.apache.geode.internal.util.ArrayUtils;
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable {
+public class AcceptorImpl implements Acceptor, Runnable {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
@@ -283,21 +286,31 @@ public class AcceptorImpl extends Acceptor implements Runnable {
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
-   * 
+   *
+   * @param cancelCriterion
    * @see SocketCreator#createServerSocket(int, int, InetAddress)
    * @see ClientHealthMonitor
    * @since GemFire 5.7
    */
   public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
-      int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
-      int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
-      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
-      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay) throws IOException {
+                      int socketBufferSize, int maximumTimeBetweenPings,
+                      InternalCache internalCache,
+                      int maxConnections, int maxThreads, int maximumMessageCount,
+                      int messageTimeToLive,
+                      ConnectionListener listener, List overflowAttributesList,
+                      boolean isGatewayReceiver,
+                      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+                      final CancelCriterion cancelCriterion) throws IOException {
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
     this.notifyBySubscription = notifyBySubscription;
     this.isGatewayReceiver = isGatewayReceiver;
     this.gatewayTransportFilters = transportFilter;
+
+    this.socketBufferSize = socketBufferSize;
+    this.cache = internalCache;
+    this.crHelper = new CachedRegionHelper(this.cache);
+
     {
       int tmp_maxConnections = maxConnections;
       if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) {
@@ -375,12 +388,6 @@ public class AcceptorImpl extends Acceptor implements Runnable {
             .getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
       }
 
-      final GemFireCacheImpl gc;
-      if (getCachedRegionHelper() != null) {
-        gc = (GemFireCacheImpl) getCachedRegionHelper().getCache();
-      } else {
-        gc = null;
-      }
       final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG).intValue();
       final long tilt = System.currentTimeMillis() + 120 * 1000;
 
@@ -422,9 +429,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
               Thread.currentThread().interrupt();
             }
           }
-          if (gc != null) {
-            gc.getCancelCriterion().checkCancelInProgress(null);
-          }
+          cancelCriterion.checkCancelInProgress(null);
         } // for
       } // isSelector
       else { // !isSelector
@@ -452,9 +457,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
               Thread.currentThread().interrupt();
             }
           }
-          if (gc != null) {
-            gc.getCancelCriterion().checkCancelInProgress(null);
-          }
+          cancelCriterion.checkCancelInProgress(null);
         } // for
       } // !isSelector
 
@@ -485,15 +488,14 @@ public class AcceptorImpl extends Acceptor implements Runnable {
 
     }
 
-    this.cache = internalCache;
-    this.crHelper = new CachedRegionHelper(this.cache);
+    final StatisticsFactory statsFactory = isGatewayReceiver ?
+        new DummyStatisticsFactory() : this.cache.getDistributedSystem();
 
-    this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
-        messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
-    this.socketBufferSize = socketBufferSize;
+    this.clientNotifier = CacheClientNotifier.getInstance(this.cache, this.stats, statsFactory, maximumMessageCount,
+        messageTimeToLive, this.connectionListener, overflowAttributesList, isGatewayReceiver);
 
     // Create the singleton ClientHealthMonitor
-    this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+    this.healthMonitor = ClientHealthMonitor.getInstance(this.cache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
     {

http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 28d6ae2..25142a0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.internal.security.SecurityService;
 import org.apache.logging.log4j.Logger;
 import org.apache.shiro.subject.Subject;
 
@@ -79,7 +80,6 @@ import org.apache.geode.distributed.internal.MessageWithReply;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.ClassLoadUtil;
-import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.InternalInstantiator;
 import org.apache.geode.internal.net.SocketCloser;
@@ -113,10 +113,8 @@ import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.InternalLogWriter;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.security.AccessControl;
 import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
@@ -125,32 +123,29 @@ import org.apache.geode.security.AuthenticationRequiredException;
  * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
  * to clients requesting notification of updates and notifies them when updates occur.
  *
- *
  * @since GemFire 3.2
  */
 @SuppressWarnings({"synthetic-access", "deprecation"})
 public class CacheClientNotifier {
   private static final Logger logger = LogService.getLogger();
+  private static final Logger securityLogger = LogService.getSecurityLogger();
 
   private static volatile CacheClientNotifier ccnSingleton;
 
   /**
    * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
-   *
-   * @param cache The GemFire <code>Cache</code>
-   * @param acceptorStats
-   * @param maximumMessageCount
-   * @param messageTimeToLive
-   * @param listener
-   * @param overflowAttributesList
-   * @return A <code>CacheClientNotifier</code> instance
    */
-  public static synchronized CacheClientNotifier getInstance(Cache cache,
-      CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
-      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
+  public static synchronized CacheClientNotifier getInstance(final Cache cache,
+                                                             final CacheServerStats acceptorStats,
+                                                             final StatisticsFactory statsFactory,
+                                                             final int maximumMessageCount,
+                                                             final int messageTimeToLive,
+                                                             final ConnectionListener listener,
+                                                             final List overflowAttributesList,
+                                                             final boolean isGatewayReceiver) {
     if (ccnSingleton == null) {
-      ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
-          messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
+      ccnSingleton = new CacheClientNotifier(cache, acceptorStats, statsFactory, maximumMessageCount,
+          messageTimeToLive, listener);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,13 +153,6 @@ public class CacheClientNotifier {
       // In this case, the HaContainer should be lazily created here
       ccnSingleton.initHaContainer(overflowAttributesList);
     }
-    // else {
-    // ccnSingleton.acceptorStats = acceptorStats;
-    // ccnSingleton.maximumMessageCount = maximumMessageCount;
-    // ccnSingleton.messageTimeToLive = messageTimeToLive;
-    // ccnSingleton._connectionListener = listener;
-    // ccnSingleton.setCache((GemFireCache)cache);
-    // }
     return ccnSingleton;
   }
 
@@ -173,6 +161,53 @@ public class CacheClientNotifier {
   }
 
   /**
+   * Constructor.
+   * @param cache The GemFire <code>Cache</code>
+   * @param acceptorStats
+   * @param statsFactory
+   * @param maximumMessageCount
+   * @param messageTimeToLive
+   * @param listener a listener which should receive notifications abouts queues being added or
+   *        removed.
+   */
+  private CacheClientNotifier(final Cache cache,
+                              final CacheServerStats acceptorStats,
+                              final StatisticsFactory statsFactory,
+                              final int maximumMessageCount,
+                              final int messageTimeToLive,
+                              final ConnectionListener listener) {
+    // Set the Cache
+    this.setCache((GemFireCacheImpl) cache);
+    this.acceptorStats = acceptorStats;
+    // we only need one thread per client and wait 50ms for close
+    this.socketCloser = new SocketCloser(1, 50);
+    this._connectionListener = listener;
+
+    this.maximumMessageCount = maximumMessageCount;
+    this.messageTimeToLive = messageTimeToLive;
+
+    this._statistics = new CacheClientNotifierStats(statsFactory);
+
+    try {
+      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+      if (this.logFrequency <= 0) {
+        this.logFrequency = DEFAULT_LOG_FREQUENCY;
+      }
+    } catch (Exception e) {
+      this.logFrequency = DEFAULT_LOG_FREQUENCY;
+    }
+
+    eventEnqueueWaitTime =
+        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+    if (eventEnqueueWaitTime < 0) {
+      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+    }
+
+    // Schedule task to periodically ping clients.
+    scheduleClientPingTask();
+  }
+
+  /**
    * Writes a given message to the output stream
    *
    * @param dos the <code>DataOutputStream</code> to use for writing the message
@@ -257,32 +292,12 @@ public class CacheClientNotifier {
     writeMessage(dos, type, ex.toString(), clientVersion);
   }
 
-  // /**
-  // * Factory method to return the singleton <code>CacheClientNotifier</code>
-  // * instance.
-  // * @return the singleton <code>CacheClientNotifier</code> instance
-  // */
-  // public static CacheClientNotifier getInstance()
-  // {
-  // return _instance;
-  // }
-
-  // /**
-  // * Shuts down the singleton <code>CacheClientNotifier</code> instance.
-  // */
-  // public static void shutdownInstance()
-  // {
-  // if (_instance == null) return;
-  // _instance.shutdown();
-  // _instance = null;
-  // }
-
   /**
    * Registers a new client updater that wants to receive updates with this server.
    *
    * @param socket The socket over which the server communicates with the client.
    */
-  public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
+  void registerClient(Socket socket, boolean isPrimary, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Since no remote ports were specified in the message, wait for them.
     long startTime = this._statistics.startTime();
@@ -329,7 +344,7 @@ public class CacheClientNotifier {
     }
   }
 
-  protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+  private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
       boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Read the ports and throw them away. We no longer need them
@@ -382,26 +397,25 @@ public class CacheClientNotifier {
       // TODO:hitesh
       Properties credentials = HandShake.readCredentials(dis, dos, system);
       if (credentials != null && proxy != null) {
-        if (securityLogWriter.fineEnabled()) {
-          securityLogWriter
-              .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
+        if (securityLogger.isDebugEnabled()) {
+          securityLogger
+              .debug("CacheClientNotifier: verifying credentials for proxyID: {}", proxyID);
         }
         Object subject = HandShake.verifyCredentials(authenticator, credentials,
-            system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
+            system.getSecurityProperties(), system.getLogWriter(), system.getSecurityLogWriter(), member);
         if (subject instanceof Principal) {
           Principal principal = (Principal) subject;
-          if (securityLogWriter.fineEnabled()) {
-            securityLogWriter
-                .fine("CacheClientNotifier: successfully verified credentials for proxyID: "
-                    + proxyID + " having principal: " + principal.getName());
+          if (securityLogger.isDebugEnabled()) {
+            securityLogger
+                .debug("CacheClientNotifier: successfully verified credentials for proxyID: {} having principal: {}", proxyID, principal.getName());
           }
 
           String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
           if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
             if (principal == null) {
-              securityLogWriter.warning(
-                  LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
-                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
+              securityLogger.warn(
+                  LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
+                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID}));
             }
             Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
             authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
@@ -417,15 +431,15 @@ public class CacheClientNotifier {
           LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
               .toLocalizedString(e));
     } catch (AuthenticationRequiredException ex) {
-      securityLogWriter.warning(
-          LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
-          new Object[] {proxyID, ex});
+      securityLogger.warn(
+          LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
+          new Object[] {proxyID, ex}));
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
       return;
     } catch (AuthenticationFailedException ex) {
-      securityLogWriter.warning(
-          LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
-          new Object[] {proxyID, ex});
+      securityLogger.warn(
+          LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
+          new Object[] {proxyID, ex}));
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
       return;
     } catch (CacheException e) {
@@ -445,11 +459,9 @@ public class CacheClientNotifier {
       return;
     }
 
-
     this._statistics.endClientRegistration(startTime);
   }
 
-
   /**
    * Registers a new client that wants to receive updates with this server.
    *
@@ -504,7 +516,8 @@ public class CacheClientNotifier {
               "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
               proxyId.getDurableId());
         }
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache, this._cache.getDistributedSystem(), SecurityService
+                .getSecurityService(), socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
         successful = this.initializeProxy(l_proxy);
       } else {
@@ -516,8 +529,8 @@ public class CacheClientNotifier {
         qSize = proxy.getQueueSize();
         // A proxy exists for this durable client. It must be reinitialized.
         if (l_proxy.isPaused()) {
-          if (CacheClientProxy.testHook != null) {
-            CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
+          if (CacheClientProxy.getTestHook() != null) {
+            CacheClientProxy.getTestHook().doTestHook("CLIENT_PRE_RECONNECT");
           }
           if (l_proxy.lockDrain()) {
             try {
@@ -531,8 +544,8 @@ public class CacheClientNotifier {
               l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
                   clientVersion);
               l_proxy.setMarkerEnqueued(true);
-              if (CacheClientProxy.testHook != null) {
-                CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
+              if (CacheClientProxy.getTestHook() != null) {
+                CacheClientProxy.getTestHook().doTestHook("CLIENT_RECONNECTED");
               }
             } finally {
               l_proxy.unlockDrain();
@@ -543,8 +556,8 @@ public class CacheClientNotifier {
                     .toLocalizedString();
             logger.warn(unsuccessfulMsg);
             responseByte = HandShake.REPLY_REFUSED;
-            if (CacheClientProxy.testHook != null) {
-              CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
+            if (CacheClientProxy.getTestHook() != null) {
+              CacheClientProxy.getTestHook().doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
             }
           }
         } else {
@@ -582,7 +595,7 @@ public class CacheClientNotifier {
 
       if (toCreateNewProxy) {
         // Create the new proxy for this non-durable client
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache, this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
         successful = this.initializeProxy(l_proxy);
       }
@@ -754,10 +767,8 @@ public class CacheClientNotifier {
    * Unregisters an existing client from this server.
    *
    * @param memberId Uniquely identifies the client
-   *
-   *
    */
-  public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+  void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
     if (logger.isDebugEnabled()) {
       logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
     }
@@ -781,8 +792,6 @@ public class CacheClientNotifier {
 
   /**
    * The client represented by the proxyId is ready to receive updates.
-   *
-   * @param proxyId
    */
   public void readyForEvents(ClientProxyMembershipID proxyId) {
     CacheClientProxy proxy = getClientProxy(proxyId);
@@ -817,7 +826,6 @@ public class CacheClientNotifier {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonNotifyClients(event, null);
-
     }
   }
 
@@ -829,7 +837,6 @@ public class CacheClientNotifier {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonNotifyClients(event, cmsg);
-
     }
   }
 
@@ -839,10 +846,6 @@ public class CacheClientNotifier {
 
     FilterInfo filterInfo = event.getLocalFilterInfo();
 
-    // if (_logger.fineEnabled()) {
-    // _logger.fine("Client dispatcher processing event " + event);
-    // }
-
     FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
     if (filterInfo != null) {
       // if the routing was made using an old profile we need to recompute it
@@ -964,10 +967,8 @@ public class CacheClientNotifier {
     if (filterInfo.filterProcessedLocally) {
       removeDestroyTokensFromCqResultKeys(event, filterInfo);
     }
-
   }
 
-
   private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event,
       FilterInfo filterInfo) {
     FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
@@ -986,38 +987,22 @@ public class CacheClientNotifier {
     }
   }
 
-
   /**
    * delivers the given message to all proxies for routing. The message should already have client
    * interest established, or override the isClientInterested method to implement its own routing
-   * 
-   * @param clientMessage
    */
   public static void routeClientMessage(Conflatable clientMessage) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
-                                                                                             // to
-                                                                                             // use
-                                                                                             // keySet
-                                                                                             // here
-                                                                                             // because
-                                                                                             // all
-                                                                                             // we
-                                                                                             // do
-                                                                                             // is
-                                                                                             // call
-                                                                                             // getClientProxy
-                                                                                             // with
-                                                                                             // these
-                                                                                             // keys
+      // ok to use keySet here because all we do is call getClientProxy with these keys
+      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet());
     }
   }
 
   /*
    * this is for server side registration of client queue
    */
-  public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+  static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
       ClientProxyMembershipID clientProxyMembershipId) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
@@ -1029,8 +1014,8 @@ public class CacheClientNotifier {
   private void singletonRouteClientMessage(Conflatable conflatable,
       Collection<ClientProxyMembershipID> filterClients) {
 
-    this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
-                                                                  // but no p2p distribution
+    // bug #43942 - client notified but no p2p distribution
+    this._cache.getCancelCriterion().checkCancelInProgress(null);
 
     List<CacheClientProxy> deadProxies = null;
     for (ClientProxyMembershipID clientId : filterClients) {
@@ -1061,7 +1046,8 @@ public class CacheClientNotifier {
    * processes the given collection of durable and non-durable client identifiers, returning a
    * collection of non-durable identifiers of clients connected to this VM
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+    // TODO: false is ignored here because true is hardcoded in other method
     return getProxyIDs(mixedDurableAndNonDurableIDs, false);
   }
 
@@ -1070,7 +1056,7 @@ public class CacheClientNotifier {
    * collection of non-durable identifiers of clients connected to this VM. This version can check
    * for proxies in initialization as well as fully initialized proxies.
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
       boolean proxyInInitMode) {
     Set<ClientProxyMembershipID> result = new HashSet();
     for (Object id : mixedDurableAndNonDurableIDs) {
@@ -1209,7 +1195,7 @@ public class CacheClientNotifier {
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
    * @return whether the <code>CacheClientNotifier</code> supports the input operation
    */
-  protected boolean supportsOperation(EnumListenerEvent operation) {
+  private boolean supportsOperation(EnumListenerEvent operation) {
     return operation == EnumListenerEvent.AFTER_CREATE
         || operation == EnumListenerEvent.AFTER_UPDATE
         || operation == EnumListenerEvent.AFTER_DESTROY
@@ -1219,87 +1205,6 @@ public class CacheClientNotifier {
         || operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
   }
 
-  // /**
-  // * Queues the <code>ClientUpdateMessage</code> to be distributed
-  // * to interested clients. This method is not being used currently.
-  // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued
-  // */
-  // protected void notifyClients(final ClientUpdateMessage clientMessage)
-  // {
-  // if (USE_SYNCHRONOUS_NOTIFICATION)
-  // {
-  // // Execute the method in the same thread as the caller
-  // deliver(clientMessage);
-  // }
-  // else {
-  // // Obtain an Executor and use it to execute the method in its own thread
-  // try
-  // {
-  // getExecutor().execute(new Runnable()
-  // {
-  // public void run()
-  // {
-  // deliver(clientMessage);
-  // }
-  // }
-  // );
-  // } catch (InterruptedException e)
-  // {
-  // _logger.warning("CacheClientNotifier: notifyClients interrupted", e);
-  // Thread.currentThread().interrupt();
-  // }
-  // }
-  // }
-
-  // /**
-  // * Updates the information this <code>CacheClientNotifier</code> maintains
-  // * for a given edge client. It is invoked when a edge client re-connects to
-  // * the server.
-  // *
-  // * @param clientHost
-  // * The host on which the client runs (i.e. the host the
-  // * CacheClientNotifier uses to communicate with the
-  // * CacheClientUpdater) This is used with the clientPort to uniquely
-  // * identify the client
-  // * @param clientPort
-  // * The port through which the server communicates with the client
-  // * (i.e. the port the CacheClientNotifier uses to communicate with
-  // * the CacheClientUpdater) This is used with the clientHost to
-  // * uniquely identify the client
-  // * @param remotePort
-  // * The port through which the client communicates with the server
-  // * (i.e. the new port the ConnectionImpl uses to communicate with the
-  // * ServerConnection)
-  // * @param membershipID
-  // * Uniquely idenifies the client
-  // */
-  // public void registerClientPort(String clientHost, int clientPort,
-  // int remotePort, ClientProxyMembershipID membershipID)
-  // {
-  // if (_logger.fineEnabled())
-  // _logger.fine("CacheClientNotifier: Registering client port: "
-  // + clientHost + ":" + clientPort + " with remote port " + remotePort
-  // + " and ID " + membershipID);
-  // for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-  // CacheClientProxy proxy = (CacheClientProxy)i.next();
-  // if (_logger.finerEnabled())
-  // _logger.finer("CacheClientNotifier: Potential client: " + proxy);
-  // //if (proxy.representsCacheClientUpdater(clientHost, clientPort))
-  // if (proxy.isMember(membershipID)) {
-  // if (_logger.finerEnabled())
-  // _logger
-  // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match");
-  // proxy.addPort(remotePort);
-  // }
-  // else {
-  // if (_logger.finerEnabled())
-  // _logger.finer("CacheClientNotifier: Host and port "
-  // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort()
-  // + " do not match " + clientHost + ":" + clientPort);
-  // }
-  // }
-  // }
-
   /**
    * Registers client interest in the input region and key.
    *
@@ -1315,7 +1220,8 @@ public class CacheClientNotifier {
   public void registerClientInterest(String regionName, Object keyOfInterest,
       ClientProxyMembershipID membershipID, int interestType, boolean isDurable,
       boolean sendUpdatesAsInvalidates, boolean manageEmptyRegions, int regionDataPolicy,
-      boolean flushState) throws IOException, RegionDestroyedException {
+      boolean flushState)
+      throws IOException, RegionDestroyedException {
 
     CacheClientProxy proxy = getClientProxy(membershipID, true);
 
@@ -1350,18 +1256,6 @@ public class CacheClientNotifier {
     }
   }
 
-  /*
-   * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID
-   * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion
-   * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new
-   * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 //
-   * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " +
-   * membershipID + " :Throwing RegionDestroyedException as region: " + regionName +
-   * " is not present."); } throw new RegionDestroyedException("registerInterest failed",
-   * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID);
-   * } }
-   */
-
   /**
    * Store region and delta relation
    * 
@@ -1457,7 +1351,6 @@ public class CacheClientNotifier {
     }
   }
 
-
   /**
    * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
    * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
@@ -1484,9 +1377,6 @@ public class CacheClientNotifier {
             }
           }
         }
-        // else {
-        // This is a replay-of-event case.
-        // }
       } else {
         // This wrapper resides in haContainer.
         wrapper.setClientUpdateMessage(null);
@@ -1541,7 +1431,7 @@ public class CacheClientNotifier {
    * 
    * @return the <code>CacheClientProxy</code> associated to the durableClientId
    */
-  public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
+  private CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final boolean isTraceEnabled = logger.isTraceEnabled();
 
@@ -1584,46 +1474,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
-   * 
-   * @return the <code>CacheClientProxy</code> associated to the same distributed system
-   */
-  public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (isDebugEnabled) {
-      logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
-          membershipID);
-      logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
-          this, getClientProxies().size());
-      /*
-       * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " +
-       * getClientProxies());
-       */
-    }
-    CacheClientProxy proxy = null;
-    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled) {
-        logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
-      }
-      if (clientProxy.isSameDSMember(membershipID)) {
-        proxy = clientProxy;
-        if (isDebugEnabled) {
-          logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
-              membershipID);
-        }
-        break;
-      }
-    }
-    return proxy;
-  }
-
-
-  /**
    * It will remove the clients connected to the passed acceptorId. If its the only server, shuts
    * down this instance.
    */
-  protected synchronized void shutdown(long acceptorId) {
+  synchronized void shutdown(long acceptorId) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
@@ -1685,14 +1539,14 @@ public class CacheClientNotifier {
    *
    * @param proxy The <code>CacheClientProxy</code> to add
    */
-  protected void addClientProxy(CacheClientProxy proxy) throws IOException {
+  private void addClientProxy(CacheClientProxy proxy) throws IOException {
     // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
     getCache(); // ensure cache reference is up to date so firstclient state is correct
     this._clientProxies.put(proxy.getProxyID(), proxy);
     // Remove this proxy from the init proxy list.
     removeClientInitProxy(proxy);
     this._connectionListener.queueAdded(proxy.getProxyID());
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    if (!(proxy.isClientConflationOn())) {
       // Delta not supported with conflation ON
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       /*
@@ -1704,22 +1558,20 @@ public class CacheClientNotifier {
       }
     }
     this.timedOutDurableClientProxies.remove(proxy.getProxyID());
-
   }
 
-  protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
+  private void addClientInitProxy(CacheClientProxy proxy) throws IOException {
     this._initClientProxies.put(proxy.getProxyID(), proxy);
   }
 
-  protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
+  private void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
     this._initClientProxies.remove(proxy.getProxyID());
   }
 
-  protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
+  private boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
     return this._initClientProxies.containsKey(proxy.getProxyID());
   }
 
-
   /**
    * Returns (possibly stale) set of memberIds for all clients being actively notified by this
    * server.
@@ -1781,7 +1633,6 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasPrimaryForDurableClient(String durableId) {
-
     for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
       CacheClientProxy proxy = (CacheClientProxy) iter.next();
       ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1818,7 +1669,9 @@ public class CacheClientNotifier {
     return ccp.getQueueSizeStat();
   }
 
-  // closes the cq and drains the queue
+  /**
+   * closes the cq and drains the queue
+   */
   public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
     CacheClientProxy proxy = getClientProxy(durableClientId);
     // close and drain
@@ -1828,33 +1681,29 @@ public class CacheClientNotifier {
     return false;
   }
 
-
   /**
    * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
    *
    * @param proxy The <code>CacheClientProxy</code> to remove
    */
-  protected void removeClientProxy(CacheClientProxy proxy) {
-    // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
-    // Exception("stack trace"));
+  void removeClientProxy(CacheClientProxy proxy) {
     ClientProxyMembershipID client = proxy.getProxyID();
     this._clientProxies.remove(client);
     this._connectionListener.queueRemoved();
     ((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client);
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    if (!(proxy.isClientConflationOn())) {
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       if (chm != null) {
         chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
       }
     }
-
   }
 
   void durableClientTimedOut(ClientProxyMembershipID client) {
     this.timedOutDurableClientProxies.add(client);
   }
 
-  public boolean isTimedOut(ClientProxyMembershipID client) {
+  private boolean isTimedOut(ClientProxyMembershipID client) {
     return this.timedOutDurableClientProxies.contains(client);
   }
 
@@ -1868,17 +1717,6 @@ public class CacheClientNotifier {
     return Collections.unmodifiableCollection(this._clientProxies.values());
   }
 
-  // /**
-  // * Returns the <code>Executor</code> that delivers messages to the
-  // * <code>CacheClientProxy</code> instances.
-  // * @return the <code>Executor</code> that delivers messages to the
-  // * <code>CacheClientProxy</code> instances
-  // */
-  // protected Executor getExecutor()
-  // {
-  // return _executor;
-  // }
-
   private void closeAllClientCqs(CacheClientProxy proxy) {
     CqService cqService = proxy.getCache().getCqService();
     if (cqService != null) {
@@ -1901,7 +1739,6 @@ public class CacheClientNotifier {
 
   /**
    * Shuts down durable client proxy
-   *
    */
   public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
     CacheClientProxy ccp = getClientProxy(durableClientId);
@@ -1930,8 +1767,9 @@ public class CacheClientNotifier {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     for (Iterator i = deadProxies.iterator(); i.hasNext();) {
       CacheClientProxy proxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled)
+      if (isDebugEnabled) {
         logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+      }
 
       // Close the proxy
       boolean keepProxy = false;
@@ -1939,7 +1777,7 @@ public class CacheClientNotifier {
         keepProxy = proxy.close(false, stoppedNormally);
       } catch (CancelException e) {
         throw e;
-      } catch (Exception e) {
+      } catch (Exception e) { // TODO: at least log at debug level
       }
 
       // Remove the proxy if necessary. It might not be necessary to remove the
@@ -1960,7 +1798,6 @@ public class CacheClientNotifier {
     } // for
   }
 
-
   /**
    * Registers a new <code>InterestRegistrationListener</code> with the set of
    * <code>InterestRegistrationListener</code>s.
@@ -1999,18 +1836,16 @@ public class CacheClientNotifier {
   }
 
   /**
-   * 
    * @since GemFire 5.8Beta
    */
-  protected boolean containsInterestRegistrationListeners() {
+  boolean containsInterestRegistrationListeners() {
     return !this.writableInterestRegistrationListeners.isEmpty();
   }
 
   /**
-   * 
    * @since GemFire 5.8Beta
    */
-  protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+  void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
     for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
       InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
       if (event.isRegister()) {
@@ -2040,8 +1875,6 @@ public class CacheClientNotifier {
       GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
         this._cache = cache;
-        this.logWriter = cache.getInternalLogWriter();
-        this.securityLogWriter = cache.getSecurityInternalLogWriter();
       }
     }
     return this._cache;
@@ -2072,68 +1905,6 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Constructor.
-   *
-   * @param cache The GemFire <code>Cache</code>
-   * @param acceptorStats
-   * @param maximumMessageCount
-   * @param messageTimeToLive
-   * @param listener a listener which should receive notifications abouts queues being added or
-   *        removed.
-   * @param overflowAttributesList
-   */
-  private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount,
-      int messageTimeToLive, ConnectionListener listener, List overflowAttributesList,
-      boolean isGatewayReceiver) {
-    // Set the Cache
-    this.setCache((GemFireCacheImpl) cache);
-    this.acceptorStats = acceptorStats;
-    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
-                                                 // for close
-
-    // Set the LogWriter
-    this.logWriter = (InternalLogWriter) cache.getLogger();
-
-    this._connectionListener = listener;
-
-    // Set the security LogWriter
-    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
-    this.maximumMessageCount = maximumMessageCount;
-    this.messageTimeToLive = messageTimeToLive;
-
-    // Initialize the statistics
-    StatisticsFactory factory;
-    if (isGatewayReceiver) {
-      factory = new DummyStatisticsFactory();
-    } else {
-      factory = this.getCache().getDistributedSystem();
-    }
-    this._statistics = new CacheClientNotifierStats(factory);
-
-    // Initialize the executors
-    // initializeExecutors(this._logger);
-
-    try {
-      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
-      if (this.logFrequency <= 0) {
-        this.logFrequency = DEFAULT_LOG_FREQUENCY;
-      }
-    } catch (Exception e) {
-      this.logFrequency = DEFAULT_LOG_FREQUENCY;
-    }
-
-    eventEnqueueWaitTime =
-        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
-    if (eventEnqueueWaitTime < 0) {
-      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
-    }
-
-    // Schedule task to periodically ping clients.
-    scheduleClientPingTask();
-  }
-
-  /**
    * this message is used to send interest registration to another server. Since interest
    * registration performs a state-flush operation this message must not transmitted on an ordered
    * socket
@@ -2228,104 +1999,6 @@ public class CacheClientNotifier {
 
   }
 
-
-  // * Initializes the <code>QueuedExecutor</code> and
-  // <code>PooledExecutor</code>
-  // * used to deliver messages to <code>CacheClientProxy</code> instances.
-  // * @param logger The GemFire <code>LogWriterI18n</code>
-  // */
-  // private void initializeExecutors(LogWriterI18n logger)
-  // {
-  // // Create the thread groups
-  // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache
-  // Client Notifier Logger Group", logger);
-  // final ThreadGroup notifierGroup =
-  // new ThreadGroup("Cache Client Notifier Group")
-  // {
-  // public void uncaughtException(Thread t, Throwable e)
-  // {
-  // Thread.dumpStack();
-  // loggerGroup.uncaughtException(t, e);
-  // //CacheClientNotifier.exceptionInThreads = true;
-  // }
-  // };
-  //
-  // // Originally set ThreadGroup to be a daemon, but it was causing the
-  // following
-  // // exception after five minutes of non-activity (the keep alive time of the
-  // // threads in the PooledExecutor.
-  //
-  // // java.lang.IllegalThreadStateException
-  // // at java.lang.ThreadGroup.add(Unknown Source)
-  // // at java.lang.Thread.init(Unknown Source)
-  // // at java.lang.Thread.<init>(Unknown Source)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321)
-  // // at
-  // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512)
-  // // at
-  // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271)
-  //
-  // //notifierGroup.setDaemon(true);
-  //
-  // if (USE_QUEUED_EXECUTOR)
-  // createQueuedExecutor(notifierGroup);
-  // else
-  // createPooledExecutor(notifierGroup);
-  // }
-
-  // /**
-  // * Creates the <code>QueuedExecutor</code> used to deliver messages
-  // * to <code>CacheClientProxy</code> instances
-  // * @param notifierGroup The <code>ThreadGroup</code> to which the
-  // * <code>QueuedExecutor</code>'s <code>Threads</code> belong
-  // */
-  // protected void createQueuedExecutor(final ThreadGroup notifierGroup)
-  // {
-  // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue());
-  // queuedExecutor.setThreadFactory(new ThreadFactory()
-  // {
-  // public Thread newThread(Runnable command)
-  // {
-  // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client
-  // Notifier");
-  // thread.setDaemon(true);
-  // return thread;
-  // }
-  // });
-  // _executor = queuedExecutor;
-  // }
-
-  // /**
-  // * Creates the <code>PooledExecutor</code> used to deliver messages
-  // * to <code>CacheClientProxy</code> instances
-  // * @param notifierGroup The <code>ThreadGroup</code> to which the
-  // * <code>PooledExecutor</code>'s <code>Threads</code> belong
-  // */
-  // protected void createPooledExecutor(final ThreadGroup notifierGroup)
-  // {
-  // PooledExecutor pooledExecutor = new PooledExecutor(new
-  // BoundedLinkedQueue(4096), 50);
-  // pooledExecutor.setMinimumPoolSize(10);
-  // pooledExecutor.setKeepAliveTime(1000 * 60 * 5);
-  // pooledExecutor.setThreadFactory(new ThreadFactory()
-  // {
-  // public Thread newThread(Runnable command)
-  // {
-  // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client
-  // Notifier");
-  // thread.setDaemon(true);
-  // return thread;
-  // }
-  // });
-  // pooledExecutor.createThreads(5);
-  // _executor = pooledExecutor;
-  // }
-
   protected void deliverInterestChange(ClientProxyMembershipID proxyID,
       ClientInterestMessageImpl message) {
     DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
@@ -2471,23 +2144,6 @@ public class CacheClientNotifier {
    */
   protected static final int ALL_PORTS = -1;
 
-  // /**
-  // * Whether to synchonously deliver messages to proxies.
-  // * This is currently hard-coded to true to ensure ordering.
-  // */
-  // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION =
-  // true;
-  // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION");
-
-  // /**
-  // * Whether to use the <code>QueuedExecutor</code> (or the
-  // * <code>PooledExecutor</code>) to deliver messages to proxies.
-  // * Currently, delivery is synchronous. No <code>Executor</code> is
-  // * used.
-  // */
-  // protected static final boolean USE_QUEUED_EXECUTOR =
-  // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR");
-
   /**
    * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
    * CacheClientProxy. Note that the keys in this map are not updated when a durable client
@@ -2512,14 +2168,7 @@ public class CacheClientNotifier {
    * direct reference to _cache in CacheClientNotifier code. Instead, you should always use
    * <code>getCache()</code>
    */
-  private GemFireCacheImpl _cache;
-
-  private InternalLogWriter logWriter;
-
-  /**
-   * The GemFire security <code>LogWriter</code>
-   */
-  private InternalLogWriter securityLogWriter;
+  private GemFireCacheImpl _cache; // TODO: not thread-safe
 
   /** the maximum number of messages that can be enqueued in a client-queue. */
   private int maximumMessageCount;
@@ -2543,10 +2192,6 @@ public class CacheClientNotifier {
    */
   private volatile HAContainerWrapper haContainer;
 
-  // /**
-  // * The singleton <code>CacheClientNotifier</code> instance
-  // */
-  // protected static CacheClientNotifier _instance;
   /**
    * The size of the server-to-client communication socket buffers. This can be modified using the
    * BridgeServer.SOCKET_BUFFER_SIZE system property.