You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/07 19:49:52 UTC

[01/12] geode git commit: GEODE-2684 Connection & ConnectionTable cleanup [Forced Update!]

Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632 c43cc7aab -> 7a83cccb1 (forced update)


GEODE-2684 Connection & ConnectionTable cleanup

removed dead code and indirect access of TcpConduit through the
connection table.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6b2b7b2f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6b2b7b2f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6b2b7b2f

Branch: refs/heads/feature/GEODE-2632
Commit: 6b2b7b2f7f3f63b8ae638e9afffa5edc0f763783
Parents: 391502a
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:13:01 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/tcp/Connection.java   | 188 ++++++-------------
 .../geode/internal/tcp/ConnectionTable.java     |  68 ++-----
 .../geode/internal/tcp/DirectReplySender.java   |   2 +-
 .../apache/geode/internal/tcp/MsgReader.java    |   2 +-
 .../apache/geode/internal/tcp/NIOMsgReader.java |   2 +-
 5 files changed, 82 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a0af245..c57a0ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -72,8 +72,6 @@ public class Connection implements Runnable {
   public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
   public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
   public final static int DIRECT_ACK_BIT = 0x20;
-  // We no longer support early ack
-  // public final static int EARLY_ACK_BIT = 0x10;
 
   public static final int MSG_HEADER_SIZE_OFFSET = 0;
   public static final int MSG_HEADER_TYPE_OFFSET = 4;
@@ -95,7 +93,9 @@ public class Connection implements Runnable {
       "member unexpectedly shut down shared, unordered connection";
 
   /** the table holding this connection */
-  final ConnectionTable owner;
+  private final ConnectionTable owner;
+
+  private final TCPConduit conduit;
 
   /**
    * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
@@ -113,15 +113,6 @@ public class Connection implements Runnable {
   /** The idle timeout timer task for this connection */
   private SystemTimerTask idleTask;
 
-  /**
-   * Returns the depth of unshared reader threads from this thread to the original
-   * non-reader-thread. E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) ->
-   * reader(domino=3)
-   */
-  public static int getDominoCount() {
-    return dominoCount.get().intValue();
-  }
-
   private final static ThreadLocal isReaderThread = new ThreadLocal();
 
   public final static void makeReaderThread() {
@@ -129,7 +120,7 @@ public class Connection implements Runnable {
     makeReaderThread(true);
   }
 
-  private final static void makeReaderThread(boolean v) {
+  private static void makeReaderThread(boolean v) {
     isReaderThread.set(v);
   }
 
@@ -150,7 +141,7 @@ public class Connection implements Runnable {
     if (connectTimeoutStr != null) {
       P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
     } else {
-      P2P_CONNECT_TIMEOUT = 6 * this.owner.owner.getDM().getConfig().getMemberTimeout();
+      P2P_CONNECT_TIMEOUT = 6 * this.conduit.getDM().getConfig().getMemberTimeout();
     }
     IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
     return P2P_CONNECT_TIMEOUT;
@@ -367,20 +358,18 @@ public class Connection implements Runnable {
   /** the buffer used for NIO message receipt */
   ByteBuffer nioInputBuffer;
 
-  /** the position of the next message's content */
-  // int nioMessageStart;
-
   /** the length of the next message to be dispatched */
   int nioMessageLength;
-  // byte nioMessageVersion;
 
   /** the type of message being received */
   byte nioMessageType;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
+
   /** caches a msg destreamer that is currently not being used */
   MsgDestreamer idleMsgDestreamer;
+
   /**
    * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using
    * nio
@@ -409,8 +398,6 @@ public class Connection implements Runnable {
   private int sendBufferSize = -1;
   private int recvBufferSize = -1;
 
-  private ReplySender replySender;
-
   private void setSendBufferSize(Socket sock) {
     setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
   }
@@ -541,6 +528,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = true;
     this.owner = t;
     this.socket = socket;
@@ -628,7 +616,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
     bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
     bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
-    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
     bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
@@ -707,19 +695,16 @@ public class Connection implements Runnable {
       my_okHandshakeBytes = okHandshakeBytes;
     }
     if (useNIO()) {
+      assert my_okHandshakeBuf != null;
       synchronized (my_okHandshakeBuf) {
         my_okHandshakeBuf.position(0);
         nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
       }
     } else {
       synchronized (outLock) {
-        try {
-          // this.writerThread = Thread.currentThread();
-          this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
-          this.output.flush();
-        } finally {
-          // this.writerThread = null;
-        }
+        assert my_okHandshakeBytes != null;
+        this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
+        this.output.flush();
       }
     }
   }
@@ -832,7 +817,7 @@ public class Connection implements Runnable {
   /**
    * asynchronously close this connection
    * 
-   * @param beingSick
+   * @param beingSick test hook to simulate sickness in communications & membership
    */
   private void asyncClose(boolean beingSick) {
     // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
@@ -890,8 +875,7 @@ public class Connection implements Runnable {
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
     final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
-    // connectHandshake.reset();
-    /**
+    /*
      * Note a byte of zero is always written because old products serialized a member id with always
      * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
      * valid address would never be 0.
@@ -925,8 +909,6 @@ public class Connection implements Runnable {
   private void handshakeStream() throws IOException {
     waitForAddressCompletion();
 
-    // this.output = new BufferedOutputStream(getSocket().getOutputStream(),
-    // owner.getConduit().bufferSize);
     this.output = getSocket().getOutputStream();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
     DataOutputStream os = new DataOutputStream(baos);
@@ -961,17 +943,12 @@ public class Connection implements Runnable {
     lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
     lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
     lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
-    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
     lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
     synchronized (outLock) {
-      try {
-        // this.writerThread = Thread.currentThread();
-        this.output.write(lenbytes, 0, lenbytes.length);
-        this.output.write(msg, 0, msg.length);
-        this.output.flush();
-      } finally {
-        // this.writerThread = null;
-      }
+      this.output.write(lenbytes, 0, lenbytes.length);
+      this.output.write(msg, 0, msg.length);
+      this.output.flush();
     }
   }
 
@@ -1091,7 +1068,7 @@ public class Connection implements Runnable {
         // create connection
         try {
           conn = null;
-          conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
+          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
         } catch (javax.net.ssl.SSLHandshakeException se) {
           // no need to retry if certificates were rejected
           throw se;
@@ -1206,7 +1183,7 @@ public class Connection implements Runnable {
 
   private void setRemoteAddr(DistributedMember m) {
     this.remoteAddr = this.owner.getDM().getCanonicalId(m);
-    MembershipManager mgr = this.owner.owner.getMembershipManager();
+    MembershipManager mgr = this.conduit.getMembershipManager();
     mgr.addSurpriseMember(m);
   }
 
@@ -1214,9 +1191,8 @@ public class Connection implements Runnable {
    * creates a new connection to a remote server. We are initiating this connection; the other side
    * must accept us We will almost always send messages; small acks are received.
    */
-  private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder,
-      DistributedMember remoteID, boolean sharedResource)
-      throws IOException, DistributedSystemDisconnectedException {
+  private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
+      boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
 
     // initialize a socket upfront. So that the
     InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
@@ -1224,6 +1200,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = false;
     this.owner = t;
     this.sharedResource = sharedResource;
@@ -1248,7 +1225,7 @@ public class Connection implements Runnable {
 
         channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-        /**
+        /*
          * If conserve-sockets is false, the socket can be used for receiving responses, so set the
          * receive buffer accordingly.
          */
@@ -1261,7 +1238,7 @@ public class Connection implements Runnable {
         setSendBufferSize(channel.socket());
         channel.configureBlocking(true);
 
-        int connectTime = getP2PConnectTimeout();;
+        int connectTime = getP2PConnectTimeout();
 
         try {
           channel.socket().connect(addr, connectTime);
@@ -1276,7 +1253,7 @@ public class Connection implements Runnable {
             Thread.currentThread().interrupt();
           }
           throw c;
-        } catch (CancelledKeyException e) {
+        } catch (CancelledKeyException | ClosedSelectorException e) {
           // bug #44469: for some reason NIO throws this runtime exception
           // instead of an IOException on timeouts
           ConnectException c = new ConnectException(
@@ -1284,14 +1261,6 @@ public class Connection implements Runnable {
                   .toLocalizedString(new Object[] {connectTime}));
           c.initCause(e);
           throw c;
-        } catch (ClosedSelectorException e) {
-          // bug #44808: for some reason JRockit NIO thorws this runtime exception
-          // instead of an IOException on timeouts
-          ConnectException c = new ConnectException(
-              LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
-                  .toLocalizedString(new Object[] {connectTime}));
-          c.initCause(e);
-          throw c;
         }
       } finally {
         this.owner.removeConnectingSocket(channel.socket());
@@ -1309,7 +1278,6 @@ public class Connection implements Runnable {
         setSocketBufferSize(this.socket, false, socketBufferSize, true);
         setSendBufferSize(this.socket);
       } else {
-        // socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
         Socket s = new Socket();
         this.socket = s;
         s.setTcpNoDelay(true);
@@ -1335,13 +1303,12 @@ public class Connection implements Runnable {
    * must not be doing it correctly.
    */
   private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
-  protected static final int BATCH_BUFFER_SIZE =
+  private static final int BATCH_BUFFER_SIZE =
       Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
-  protected static final int BATCH_FLUSH_MS =
-      Integer.getInteger("p2p.batchFlushTime", 50).intValue();
-  protected Object batchLock;
-  protected ByteBuffer fillBatchBuffer;
-  protected ByteBuffer sendBatchBuffer;
+  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
+  private Object batchLock;
+  private ByteBuffer fillBatchBuffer;
+  private ByteBuffer sendBatchBuffer;
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
@@ -1446,13 +1413,7 @@ public class Connection implements Runnable {
                   SocketChannel channel = getSocket().getChannel();
                   nioWriteFully(channel, sendBatchBuffer, false, null);
                   sendBatchBuffer.clear();
-                } catch (IOException ex) {
-                  logger.fatal(LocalizedMessage.create(
-                      LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
-                  readerShuttingDown = true;
-                  requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0
-                      .toLocalizedString(ex));
-                } catch (ConnectionException ex) {
+                } catch (IOException | ConnectionException ex) {
                   logger.fatal(LocalizedMessage.create(
                       LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
                   readerShuttingDown = true;
@@ -1526,13 +1487,6 @@ public class Connection implements Runnable {
     return this.closing.get();
   }
 
-  /**
-   * Used to close a connection that has not yet been registered with the distribution manager.
-   */
-  void closePartialConnect(String reason) {
-    close(reason, false, false, false, false);
-  }
-
   void closePartialConnect(String reason, boolean beingSick) {
     close(reason, false, false, beingSick, false);
   }
@@ -1619,9 +1573,9 @@ public class Connection implements Runnable {
       // if network partition detection is enabled or this is an admin vm
       // we can't wait for the reader thread when running in an IBM JRE. See
       // bug 41889
-      if (this.owner.owner.config.getEnableNetworkPartitionDetection()
-          || this.owner.owner.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
-          || this.owner.owner.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+      if (this.conduit.config.getEnableNetworkPartitionDetection()
+          || this.conduit.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
+          || this.conduit.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
         isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
       }
       {
@@ -1673,6 +1627,7 @@ public class Connection implements Runnable {
               }
             }
           } else {
+            // noinspection ConstantConditions
             this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
           }
         } else if (!this.isReceiver) {
@@ -1735,7 +1690,7 @@ public class Connection implements Runnable {
       initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
         if (!this.sharedResource) {
-          this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+          this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
@@ -1759,7 +1714,7 @@ public class Connection implements Runnable {
   }
 
   private String p2pReaderName() {
-    StringBuffer sb = new StringBuffer(64);
+    StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
       sb.append("P2P message reader@");
     } else {
@@ -1973,8 +1928,8 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return (msg.indexOf("forcibly closed") >= 0) || (msg.indexOf("reset by peer") >= 0)
-        || (msg.indexOf("connection reset") >= 0);
+    return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
+        || (msg.contains("connection reset"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2012,7 +1967,7 @@ public class Connection implements Runnable {
           this.idleMsgDestreamer = null;
         } else {
           result = new MsgDestreamer(this.owner.getConduit().stats,
-              this.owner.owner.getCancelCriterion(), v);
+              this.conduit.getCancelCriterion(), v);
         }
         result.setName(p2pReaderName() + " msgId=" + msgId);
         this.destreamerMap.put(key, result);
@@ -2103,7 +2058,7 @@ public class Connection implements Runnable {
         /* byte msgHdrVersion = */ calcHdrVersion(len);
         len = calcMsgByteSize(len);
         int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) ((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff * 0x100)
+        short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
             + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
         boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
         if (myDirectAck) {
@@ -2384,7 +2339,7 @@ public class Connection implements Runnable {
                   // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
                   // will prefer shared sockets");
                 }
-                this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
               }
 
               if (logger.isDebugEnabled()) {
@@ -2541,11 +2496,6 @@ public class Connection implements Runnable {
         }
         bytesSoFar += bytesThisTime;
       } catch (InterruptedIOException io) {
-        // try { Thread.sleep(10); }
-        // catch (InterruptedException ie) {
-        // Thread.currentThread().interrupt();
-        // }
-
         // Current thread has been interrupted. Regard it similar to an EOF
         this.readerShuttingDown = true;
         try {
@@ -2582,7 +2532,7 @@ public class Connection implements Runnable {
     final boolean origSocketInUse = this.socketInUse;
     byte originalState = -1;
     synchronized (stateLock) {
-      originalState = this.connectionState;;
+      originalState = this.connectionState;
       this.connectionState = STATE_SENDING;
     }
     this.socketInUse = true;
@@ -2597,13 +2547,8 @@ public class Connection implements Runnable {
         } else {
           byte[] bytesToWrite = getBytesToWrite(buffer);
           synchronized (outLock) {
-            try {
-              // this.writerThread = Thread.currentThread();
-              this.output.write(bytesToWrite);
-              this.output.flush();
-            } finally {
-              // this.writerThread = null;
-            }
+            this.output.write(bytesToWrite);
+            this.output.flush();
           }
         }
       }
@@ -2763,15 +2708,7 @@ public class Connection implements Runnable {
     return bytesToWrite;
   }
 
-  // private String socketInfo() {
-  // return (" socket: " + getSocket().getLocalAddress() + ":" + getSocket().getLocalPort() + " -> "
-  // +
-  // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection = " +
-  // System.identityHashCode(this));
-  //
-  // }
-
-  private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
+  private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
       throws ConnectionException {
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -2891,7 +2828,7 @@ public class Connection implements Runnable {
    * 
    * @throws ConnectionException if the conduit has stopped
    */
-  private final boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
+  private boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
       throws ConnectionException {
     if (!addToQueue(buffer, msg, true)) {
       return false;
@@ -2931,7 +2868,7 @@ public class Connection implements Runnable {
     this.pusherThread.start();
   }
 
-  private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
+  private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
     ByteBuffer result = null;
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -3152,7 +3089,7 @@ public class Connection implements Runnable {
    * Return false if socket writes to be done async/nonblocking Return true if socket writes to be
    * done sync/blocking
    */
-  private final boolean useSyncWrites(boolean forceAsync) {
+  private boolean useSyncWrites(boolean forceAsync) {
     if (forceAsync) {
       return false;
     }
@@ -3185,7 +3122,7 @@ public class Connection implements Runnable {
 
   static private final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2)
 
-  private final void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
+  private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage p_msg, final DMStats stats) throws IOException {
     DistributionMessage msg = p_msg;
     // async/non-blocking
@@ -3301,7 +3238,7 @@ public class Connection implements Runnable {
                 if (msToWait <= 0) {
                   Thread.yield();
                 } else {
-                  boolean interrupted = Thread.interrupted();;
+                  boolean interrupted = Thread.interrupted();
                   try {
                     Thread.sleep(msToWait);
                   } catch (InterruptedException ex) {
@@ -3401,10 +3338,10 @@ public class Connection implements Runnable {
   /**
    * stateLock is used to synchronize state changes.
    */
-  protected Object stateLock = new Object();
+  private final Object stateLock = new Object();
 
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState = STATE_IDLE;
+  private byte connectionState = STATE_IDLE;
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
@@ -3420,16 +3357,11 @@ public class Connection implements Runnable {
   /** the connection is in use and is reading a message */
   protected static final byte STATE_READING = 5;
 
-  protected static final String[] STATE_NAMES =
-      new String[] {"idle", "sending", "post_sending", "reading_ack", "received_ack", "reading"};
   /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 
   /** set to true if we exceeded the ack-wait-threshold waiting for a response */
   protected volatile boolean ackTimedOut;
 
-  private static int ACK_SIZE = 1;
-  private static byte ACK_BYTE = 37;
-
   /**
    * @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever.
    * @param msInterval interval between checks
@@ -3899,7 +3831,7 @@ public class Connection implements Runnable {
                     // } else {
                     // ConnectionTable.threadWantsSharedResources();
                   }
-                  this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                  this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
                   // Because this thread is not shared resource, it will be used for direct
                   // ack. Direct ack messages can be large. This call will resize the send
                   // buffer.
@@ -4039,6 +3971,10 @@ public class Connection implements Runnable {
     }
   }
 
+  protected TCPConduit getConduit() {
+    return this.conduit;
+  }
+
   protected Socket getSocket() throws SocketException {
     // fix for bug 37286
     Socket result = this.socket;
@@ -4177,7 +4113,7 @@ public class Connection implements Runnable {
   boolean nioChecked;
   boolean useNIO;
 
-  private final boolean useNIO() {
+  private boolean useNIO() {
     if (TCPConduit.useSSL) {
       return false;
     }
@@ -4193,7 +4129,7 @@ public class Connection implements Runnable {
     if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) {
       String os = System.getProperty("os.name");
       if (os != null) {
-        if (os.indexOf("Windows") != -1) {
+        if (os.contains("Windows")) {
           this.useNIO = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 08a9009..c55af82 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -64,17 +64,9 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
  * 
  * @since GemFire 2.1
  */
-/*
- * Note: We no longer use InputMultiplexer If InputMux is reinstated then the manager needs to be
- * initialized and all lines that have a NOMUX preface should be uncommented
- * 
- */
 public class ConnectionTable {
   private static final Logger logger = LogService.getLogger();
 
-  /** a random number generator for secondary connection selection */
-  // static java.util.Random random = new java.util.Random();
-
   /** warning when descriptor limit reached */
   private static boolean ulimitWarningIssued;
 
@@ -82,6 +74,7 @@ public class ConnectionTable {
    * true if the current thread wants non-shared resources
    */
   private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+
   /**
    * Used for messages whose order must be preserved Only connections used for sending messages, and
    * receiving acks, will be put in this map.
@@ -132,9 +125,7 @@ public class ConnectionTable {
   /**
    * the conduit for this table
    */
-  protected final TCPConduit owner;
-  // ARB: temp making this protected to provide access to Connection.
-  // private final TCPConduit owner;
+  private final TCPConduit owner;
 
   /**
    * true if this table is no longer in use
@@ -205,17 +196,10 @@ public class ConnectionTable {
     return (Boolean) threadWantsOwnResources.get();
   }
 
-  // public static void setThreadOwnsResourcesRegistration(
-  // Boolean newValue) {
-  // threadWantsOwnResources.set(newValue);
-  // }
-  // private Map connections = new HashMap();
-  /* NOMUX: private InputMuxManager inputMuxManager; */
-  // private int lowWater;
-  // private int highWater;
+  public TCPConduit getOwner() {
+    return owner;
+  }
 
-  // private static boolean TRACK_SERVER_CONNECTIONS =
-  // System.getProperty("p2p.bidirectional", "true").equals("true");
 
   private ConnectionTable(TCPConduit c) throws IOException {
     this.owner = c;
@@ -226,10 +210,6 @@ public class ConnectionTable {
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
-    /*
-     * NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new InputMuxManager(this);
-     * inputMuxManager.start(c.logger); }
-     */
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -306,7 +286,6 @@ public class ConnectionTable {
       }
     }
 
-    // Stub id = conn.getRemoteId();
     if (conn != null) {
       synchronized (this.receivers) {
         this.owner.stats.incReceivers();
@@ -322,22 +301,9 @@ public class ConnectionTable {
             conn.remoteAddr);
       }
     }
-    // cleanupHighWater();
   }
 
 
-  // /** returns the connection associated with the given key, or null if
-  // no such connection exists */
-  // protected Connection basicGet(Serializable id) {
-  // synchronized (this.orderedConnectionMap) {
-  // return (Connection) this.orderedConnectionMap.get(id);
-  // }
-  // }
-
-  // protected Connection get(Serializable id) throws java.io.IOException {
-  // return get(id, false);
-  // }
-
 
   /**
    * Process a newly created PendingConnection
@@ -432,10 +398,10 @@ public class ConnectionTable {
   }
 
   /**
-   * unordered or conserve-sockets note that unordered connections are currently always shared
+   * unordered or conserve-sockets=true note that unordered connections are currently always shared
    * 
    * @param id the DistributedMember on which we are creating a connection
-   * @param threadOwnsResources whether unordered conn is owned by the current thread
+   * @param scheduleTimeout whether unordered connection should time out
    * @param preserveOrder whether to preserve order
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
@@ -444,9 +410,9 @@ public class ConnectionTable {
    * @throws IOException if unable to create the connection
    * @throws DistributedSystemDisconnectedException
    */
-  private Connection getUnorderedOrConserveSockets(DistributedMember id,
-      boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout,
-      long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+  private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
+      boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+      throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
     final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
@@ -472,7 +438,7 @@ public class ConnectionTable {
     if (pc != null) {
       result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
           startTime, ackTimeout, ackSATimeout);
-      if (!preserveOrder && threadOwnsResources) {
+      if (!preserveOrder && scheduleTimeout) {
         scheduleIdleTimeout(result);
       }
     } else { // we have existing connection
@@ -487,10 +453,10 @@ public class ConnectionTable {
             startTime, ackTimeout, ackSATimeout);
         if (logger.isDebugEnabled()) {
           if (result != null) {
-            logger.debug("getUnorderedOrConserveSockets {} myAddr={} theirAddr={}", result,
+            logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
                 getConduit().getMemberId(), result.remoteAddr);
           } else {
-            logger.debug("getUnorderedOrConserveSockets: Connect failed");
+            logger.debug("getSharedConnection: Connect failed");
           }
         }
       } else {
@@ -512,7 +478,7 @@ public class ConnectionTable {
    * @throws IOException if the connection could not be created
    * @throws DistributedSystemDisconnectedException
    */
-  Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout,
+  Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
       long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
@@ -658,10 +624,10 @@ public class ConnectionTable {
     Connection result = null;
     boolean threadOwnsResources = threadOwnsResources();
     if (!preserveOrder || !threadOwnsResources) {
-      result = getUnorderedOrConserveSockets(id, threadOwnsResources, preserveOrder, startTime,
-          ackTimeout, ackSATimeout);
+      result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
+          ackSATimeout);
     } else {
-      result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout);
+      result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
     }
     if (result != null) {
       Assert.assertTrue(result.preserveOrder == preserveOrder);

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 3872ee9..bf06953 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -56,7 +56,7 @@ class DirectReplySender implements ReplySender {
     // mutates the list when it has exceptions.
 
     // fix for bug #42199 - cancellation check
-    this.conn.owner.getDM().getCancelCriterion().checkCancelInProgress(null);
+    this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
 
     if (logger.isTraceEnabled(LogMarker.DM)) {
       logger.trace(LogMarker.DM, "Sending a direct reply {} to {}", msg, conn.getRemoteAddress());

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index fc56271..be1f533 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
   public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
 
   protected DMStats getStats() {
-    return conn.owner.getConduit().stats;
+    return conn.getConduit().stats;
   }
 
   public static class Header {

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
index 50f5fae..a4e35a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
@@ -79,7 +79,7 @@ public class NIOMsgReader extends MsgReader {
     if (nioInputBuffer == null) {
       int allocSize = conn.getReceiveBufferSize();
       if (allocSize == -1) {
-        allocSize = conn.owner.getConduit().tcpBufferSize;
+        allocSize = conn.getConduit().tcpBufferSize;
       }
       if (allocSize > bufferSize) {
         bufferSize = allocSize;


[07/12] geode git commit: GEODE-2732 after auto-reconnect a server is restarted on the default port

Posted by kl...@apache.org.
GEODE-2732 after auto-reconnect a server is restarted on the default port

Now that gfsh cache server parameters are no longer held in a ThreadLocal
we need to clear the static variables holding the parameters in order
to avoid having one test affect another.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/799548ee
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/799548ee
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/799548ee

Branch: refs/heads/feature/GEODE-2632
Commit: 799548ee4e4d883309f83bf401d6af892b3abfc1
Parents: 19376d3
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Apr 6 15:46:31 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Apr 6 15:48:21 2017 -0700

----------------------------------------------------------------------
 .../geode/test/dunit/internal/JUnit4DistributedTestCase.java       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/799548ee/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
index 19949c2..5a679bb 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
@@ -34,6 +34,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.admin.ClientStatsManager;
+import org.apache.geode.internal.cache.CacheServerLauncher;
 import org.apache.geode.internal.cache.DiskStoreObserver;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.HARegion;
@@ -581,6 +582,7 @@ public abstract class JUnit4DistributedTestCase implements DistributedTestFixtur
     disconnectFromDS();
     // keep alphabetized to detect duplicate lines
     CacheCreation.clearThreadLocals();
+    CacheServerLauncher.clearStatics();
     CacheServerTestUtil.clearCacheReference();
     ClientProxyMembershipID.system = null;
     ClientServerTestCase.AUTO_LOAD_BALANCE = false;


[11/12] geode git commit: WIP refactoring

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7b923c..ab97b64 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -43,7 +43,6 @@ import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.ClientSession;
 import org.apache.geode.cache.DynamicRegionFactory;
@@ -111,7 +110,6 @@ import org.apache.shiro.util.ThreadState;
  * It queues messages to be sent from the server to the client. It then reads those messages from
  * the queue and sends them to the client.
  *
- *
  * @since GemFire 4.2
  */
 @SuppressWarnings("synthetic-access")
@@ -119,155 +117,127 @@ public class CacheClientProxy implements ClientSession {
   private static final Logger logger = LogService.getLogger();
 
   /**
-   * The socket between the server and the client
-   */
-  protected Socket _socket;
-
-  private final AtomicBoolean _socketClosed = new AtomicBoolean();
-
-  /**
-   * A communication buffer used by each message we send to the client
+   * Notify the region when a client interest registration occurs. This tells the region to update
+   * access time when an update is to be pushed to a client. It is enabled only for
+   * <code>PartitionedRegion</code>s currently.
    */
-  protected ByteBuffer _commBuffer;
+  private static final boolean NOTIFY_REGION_ON_INTEREST =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
 
   /**
-   * The remote host's IP address string (cached for convenience)
+   * The number of times to peek on shutdown before giving up and shutting down
    */
-  protected String _remoteHostAddress;
+  private static final int MAXIMUM_SHUTDOWN_PEEKS =
+      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50);
 
   /**
-   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+   * Default value for slow starting time of dispatcher
    */
-  protected volatile boolean isMarkedForRemoval = false;
+  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
 
   /**
-   * @see #isMarkedForRemoval
+   * Key in the system property from which the slow starting time value will be retrieved
    */
-  protected final Object isMarkedForRemovalLock = new Object();
+  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
 
   /**
-   * The proxy id of the client represented by this proxy
+   * TODO: delete this and rewrite the tests that use this NOTE: this is NOT thread safe
    */
-  protected ClientProxyMembershipID proxyID;
+  private static TestHook testHook;
 
   /**
-   * The GemFire cache
+   * TODO: delete this and rewrite the test that uses this A debug flag used for testing Backward
+   * compatibility
    */
-  protected final GemFireCacheImpl _cache;
+  private static boolean afterMessageCreationForTesting = false;
 
   /**
-   * The list of keys that the client represented by this proxy is interested in (stored by region)
+   * TODO: delete this and rewrite the test that uses this for testing purposes, delays the start of
+   * the dispatcher thread
    */
-  protected final ClientInterestList[] cils = new ClientInterestList[2];
+  private static boolean isSlowStartForTesting = false;
 
-  /**
-   * A thread that dispatches messages to the client
-   */
-  protected volatile MessageDispatcher _messageDispatcher;
+  private final AtomicBoolean socketClosed = new AtomicBoolean();
 
   /**
-   * The statistics for this proxy
+   * @see #isMarkedForRemoval
    */
-  protected final CacheClientProxyStats _statistics;
-
-  protected final AtomicReference _durableExpirationTask = new AtomicReference();
-
-  protected SystemTimer durableTimer;
+  private final Object isMarkedForRemovalLock = new Object();
 
   /**
-   * Whether this dispatcher is paused
+   * The GemFire cache
    */
-  protected volatile boolean _isPaused = true;
+  private final GemFireCacheImpl cache;
 
   /**
-   * True if we are connected to a client.
-   */
-  private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
-  /**
-   * True if a marker message is still in the ha queue.
+   * The list of keys that the client represented by this proxy is interested in (stored by region)
    */
-  private boolean markerEnqueued = false;
+  private final ClientInterestList[] cils = new ClientInterestList[2];
 
   /**
-   * The number of times to peek on shutdown before giving up and shutting down
+   * The statistics for this proxy
    */
-  protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue();
+  private final CacheClientProxyStats _statistics;
 
-  /**
-   * The number of milliseconds to wait for an offering to the message queue
-   */
-  protected static final int MESSAGE_OFFER_TIME = 0;
-
-  /**
-   * The default maximum message queue size
-   */
-  // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+  private final AtomicReference _durableExpirationTask = new AtomicReference();
 
   /** The message queue size */
-  protected final int _maximumMessageCount;
+  private final int _maximumMessageCount;
 
   /**
    * The time (in seconds ) after which a message in the client queue will expire.
    */
-  protected final int _messageTimeToLive;
+  private final int _messageTimeToLive;
 
   /**
    * The <code>CacheClientNotifier</code> registering this proxy.
    */
-  protected final CacheClientNotifier _cacheClientNotifier;
+  private final CacheClientNotifier cacheClientNotifier;
 
-  /**
-   * Defaults to true; meaning do some logging of dropped client notification messages. Set the
-   * system property to true to cause dropped messages to NOT be logged.
-   */
-  protected static final boolean LOG_DROPPED_MSGS =
-      !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings");
+  private final Object clientUserAuthsLock = new Object();
 
   /**
-   * for testing purposes, delays the start of the dispatcher thread
+   * The AcceptorImpl identifier to which the proxy is connected.
    */
-  public static boolean isSlowStartForTesting = false;
+  private final long _acceptorId;
 
-  /**
-   * Default value for slow starting time of dispatcher
-   */
-  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+  /** acceptor's setting for notifyBySubscription */
+  private final boolean notifyBySubscription;
+
+  private final Object queuedEventsSync = new Object();
 
   /**
-   * Key in the system property from which the slow starting time value will be retrieved
+   * A counter that keeps track of how many task iterations that have occurred since the last ping
+   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
+   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
    */
-  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+  private final AtomicInteger pingCounter = new AtomicInteger();
 
-  private boolean isPrimary;
+  private final Object drainLock = new Object();
 
-  /** @since GemFire 5.7 */
-  protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+  private final Object drainsInProgressLock = new Object();
+
+  private final SecurityService securityService;
 
   /**
-   * Flag to indicate whether to keep a durable client's queue alive
+   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
    */
-  boolean keepalive = false;
-
-  private AccessControl postAuthzCallback;
-  private Subject subject;
+  private volatile boolean isMarkedForRemoval = false;
 
   /**
-   * For multiuser environment..
+   * A thread that dispatches messages to the client
    */
-  private ClientUserAuths clientUserAuths;
+  private volatile MessageDispatcher _messageDispatcher;
 
-  private final Object clientUserAuthsLock = new Object();
+  /**
+   * Whether this dispatcher is paused
+   */
+  private volatile boolean _isPaused = true;
 
   /**
-   * The version of the client
+   * True if we are connected to a client.
    */
-  private Version clientVersion;
+  private volatile boolean connected = false;
 
   /**
    * A map of region name as key and integer as its value. Basically, it stores the names of the
@@ -278,42 +248,60 @@ public class CacheClientProxy implements ClientSession {
    */
   private volatile Map regionsWithEmptyDataPolicy = new HashMap();
 
+  /** To queue the events arriving during message dispatcher initialization */
+  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
+      new ConcurrentLinkedQueue<Conflatable>();
+
+  private volatile boolean messageDispatcherInit = false;
+
   /**
-   * A debug flag used for testing Backward compatibility
+   * The socket between the server and the client
    */
-  public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+  private Socket socket;
 
   /**
-   * Notify the region when a client interest registration occurs. This tells the region to update
-   * access time when an update is to be pushed to a client. It is enabled only for
-   * <code>PartitionedRegion</code>s currently.
+   * A communication buffer used by each message we send to the client
    */
-  protected static final boolean NOTIFY_REGION_ON_INTEREST =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
+  private ByteBuffer _commBuffer;
 
   /**
-   * The AcceptorImpl identifier to which the proxy is connected.
+   * The remote host's IP address string (cached for convenience)
    */
-  private final long _acceptorId;
+  private String _remoteHostAddress;
 
-  /** acceptor's setting for notifyBySubscription */
-  private final boolean notifyBySubscription;
+  /**
+   * The proxy id of the client represented by this proxy
+   */
+  private ClientProxyMembershipID proxyID;
 
-  /** To queue the events arriving during message dispatcher initialization */
-  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
-      new ConcurrentLinkedQueue<Conflatable>();
+  /**
+   * True if a marker message is still in the ha queue.
+   */
+  private boolean markerEnqueued = false;
 
-  private final Object queuedEventsSync = new Object();
+  private boolean isPrimary;
 
-  private volatile boolean messageDispatcherInit = false;
+  /** @since GemFire 5.7 */
+  private byte clientConflation = HandShake.CONFLATION_DEFAULT;
 
   /**
-   * A counter that keeps track of how many task iterations that have occurred since the last ping
-   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
-   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
+   * Flag to indicate whether to keep a durable client's queue alive
    */
-  private final AtomicInteger pingCounter = new AtomicInteger();
+  private boolean keepalive = false;
+
+  private AccessControl postAuthzCallback;
 
+  private Subject subject;
+
+  /**
+   * For multiuser environment..
+   */
+  private ClientUserAuths clientUserAuths;
+
+  /**
+   * The version of the client
+   */
+  private Version clientVersion;
 
   /** Date on which this instances was created */
   private Date creationDate;
@@ -321,52 +309,75 @@ public class CacheClientProxy implements ClientSession {
   /**
    * true when the durable client associated with this proxy is being restarted and prevents cqs
    * from being closed and drained
-   **/
+   */
   private boolean drainLocked = false;
-  private final Object drainLock = new Object();
 
   /** number of cq drains that are currently in progress **/
   private int numDrainsInProgress = 0;
-  private final Object drainsInProgressLock = new Object();
 
-  private SecurityService securityService = SecurityService.getSecurityService();
+  static CacheClientProxy createCacheClientProxy(final CacheClientNotifier ccn,
+      final GemFireCacheImpl cache, final StatisticsFactory statsFactory,
+      final SecurityService securityService, final Socket socket,
+      final ClientProxyMembershipID proxyID, final boolean isPrimary, final byte clientConflation,
+      final Version clientVersion, final long acceptorId, final boolean notifyBySubscription) {
+
+    CacheClientProxy cacheClientProxy =
+        new CacheClientProxy(ccn, cache, statsFactory, securityService, socket, proxyID, isPrimary,
+            clientConflation, clientVersion, acceptorId, notifyBySubscription);
+
+    // Create the interest list
+    cacheClientProxy.cils[RegisterInterestTracker.interestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.proxyID);
+    // Create the durable interest list
+    cacheClientProxy.cils[RegisterInterestTracker.durableInterestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.getDurableId());
+
+    return cacheClientProxy;
+  }
 
   /**
    * Constructor.
    *
    * @param ccn The <code>CacheClientNotifier</code> registering this proxy
+   * @param cache
    * @param socket The socket between the server and the client
    * @param proxyID representing the Connection Proxy of the clien
    * @param isPrimary The boolean stating whether this prozxy is primary
-   * @throws CacheException {
-   */
-  protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
-      ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
-      Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException {
+   * @param clientConflation
+   * @param clientVersion
+   */
+  private CacheClientProxy(final CacheClientNotifier ccn, final GemFireCacheImpl cache,
+      final StatisticsFactory statsFactory, final SecurityService securityService,
+      final Socket socket, final ClientProxyMembershipID proxyID, final boolean isPrimary,
+      final byte clientConflation, final Version clientVersion, final long acceptorId,
+      final boolean notifyBySubscription) {
     initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
-    this._cacheClientNotifier = ccn;
-    this._cache = (GemFireCacheImpl) ccn.getCache();
+    this.cacheClientNotifier = ccn;
+    this.cache = cache;
+    this.securityService = securityService;
     this._maximumMessageCount = ccn.getMaximumMessageCount();
     this._messageTimeToLive = ccn.getMessageTimeToLive();
     this._acceptorId = acceptorId;
     this.notifyBySubscription = notifyBySubscription;
-    StatisticsFactory factory = this._cache.getDistributedSystem();
+
     this._statistics =
-        new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
-            + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
+        new CacheClientProxyStats(statsFactory, "id_" + this.proxyID.getDistributedMember().getId()
+            + "_at_" + this._remoteHostAddress + ":" + this.socket.getPort());
 
-    // Create the interest list
-    this.cils[RegisterInterestTracker.interestListIndex] =
-        new ClientInterestList(this, this.proxyID);
-    // Create the durable interest list
-    this.cils[RegisterInterestTracker.durableInterestListIndex] =
-        new ClientInterestList(this, this.getDurableId());
     this.postAuthzCallback = null;
-    this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+    this.cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
     this.creationDate = new Date();
     initializeClientAuths();
   }
 
+  boolean isClientConflationOn() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
+  boolean isClientConflationDefault() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
   private void initializeClientAuths() {
     if (AcceptorImpl.isPostAuthzCallbackPresent())
       this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
@@ -411,13 +422,13 @@ public class CacheClientProxy implements ClientSession {
 
   private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
       byte cc, Version vers) {
-    this._socket = socket;
+    this.socket = socket;
     this.proxyID = pid;
     this.connected = true;
     {
       int bufSize = 1024;
       try {
-        bufSize = _socket.getSendBufferSize();
+        bufSize = this.socket.getSendBufferSize();
         if (bufSize < 1024) {
           bufSize = 1024;
         }
@@ -450,7 +461,6 @@ public class CacheClientProxy implements ClientSession {
     return this.notifyBySubscription;
   }
 
-
   /**
    * Returns the DistributedMember represented by this proxy
    */
@@ -458,47 +468,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -522,11 +491,11 @@ public class CacheClientProxy implements ClientSession {
    * @return the socket between the server and the client
    */
   protected Socket getSocket() {
-    return this._socket;
+    return this.socket;
   }
 
   public String getSocketHost() {
-    return this._socket.getInetAddress().getHostAddress();
+    return this.socket.getInetAddress().getHostAddress();
   }
 
   protected ByteBuffer getCommBuffer() {
@@ -548,7 +517,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the remote host's port
    */
   public int getRemotePort() {
-    return this._socket.getPort();
+    return this.socket.getPort();
   }
 
   /**
@@ -593,7 +562,7 @@ public class CacheClientProxy implements ClientSession {
             this.isMarkedForRemovalLock.wait();
           } catch (InterruptedException e) {
             interrupted = true;
-            this._cache.getCancelCriterion().checkCancelInProgress(e);
+            this.cache.getCancelCriterion().checkCancelInProgress(e);
           }
         } // while
       } finally {
@@ -621,7 +590,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the GemFire cache
    */
   public GemFireCacheImpl getCache() {
-    return this._cache;
+    return this.cache;
   }
 
   public Set<String> getInterestRegisteredRegions() {
@@ -649,7 +618,7 @@ public class CacheClientProxy implements ClientSession {
    * @return this proxy's <code>CacheClientNotifier</code>
    */
   protected CacheClientNotifier getCacheClientNotifier() {
-    return this._cacheClientNotifier;
+    return this.cacheClientNotifier;
   }
 
   /**
@@ -852,8 +821,8 @@ public class CacheClientProxy implements ClientSession {
         }
       }
     } catch (Exception ex) {
-      if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
-        this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
+      if (this.cache.getSecurityLoggerI18n().warningEnabled()) {
+        this.cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
             new Object[] {this, ex});
       }
     }
@@ -991,9 +960,9 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void closeSocket() {
-    if (this._socketClosed.compareAndSet(false, true)) {
+    if (this.socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+      this.cacheClientNotifier.getSocketCloser().asyncClose(this.socket, this._remoteHostAddress,
           null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
@@ -1008,7 +977,7 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+        this.cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }
@@ -1124,7 +1093,7 @@ public class CacheClientProxy implements ClientSession {
       InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
     // Create a client interest message for the keyOfInterest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER);
 
     // Notify all secondary proxies of a change in interest
@@ -1146,7 +1115,7 @@ public class CacheClientProxy implements ClientSession {
       String regionName, Object keyOfInterest) {
     // Get the initial value
     Get70 request = (Get70) Get70.getCommand();
-    LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+    LocalRegion lr = (LocalRegion) this.cache.getRegion(regionName);
     Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null);
     boolean isObject = entry.isObject;
     byte[] value = null;
@@ -1170,7 +1139,7 @@ public class CacheClientProxy implements ClientSession {
       EventID eventId = null;
       if (clientInterestMessage == null) {
         // If the clientInterestMessage is null, create a new event id
-        eventId = new EventID(this._cache.getDistributedSystem());
+        eventId = new EventID(this.cache.getDistributedSystem());
       } else {
         // If the clientInterestMessage is not null, base the event id off its event id to fix
         // GEM-794.
@@ -1239,7 +1208,7 @@ public class CacheClientProxy implements ClientSession {
       boolean isDurable, boolean receiveValues, int interestType) {
     // Notify all secondary proxies of a change in interest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         (byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER);
     notifySecondariesOfInterestChange(message);
 
@@ -1269,17 +1238,9 @@ public class CacheClientProxy implements ClientSession {
           .append("->").append(InterestType.getString(message.getInterestType()));
       logger.debug(buffer.toString());
     }
-    this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+    this.cacheClientNotifier.deliverInterestChange(this.proxyID, message);
   }
 
-  /*
-   * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try {
-   * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch
-   * (RegionDestroyedException e) {
-   * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" +
-   * keyOfInterest, e); } }
-   */
-
   /**
    * Registers interest in the input region name and key
    *
@@ -1293,7 +1254,7 @@ public class CacheClientProxy implements ClientSession {
     cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
     if (flushState) {
       flushForInterestRegistration(regionName,
-          this._cache.getDistributedSystem().getDistributedMember());
+          this.cache.getDistributedSystem().getDistributedMember());
     }
     HARegionQueue queue = getHARegionQueue();
     if (queue != null) { // queue is null during initialization
@@ -1306,7 +1267,7 @@ public class CacheClientProxy implements ClientSession {
    * interest. During queue creation it is the queue's image provider.
    */
   public void flushForInterestRegistration(String regionName, DistributedMember target) {
-    Region r = this._cache.getRegion(regionName);
+    Region r = this.cache.getRegion(regionName);
     if (r == null) {
       if (logger.isDebugEnabled()) {
         logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
@@ -1320,7 +1281,7 @@ public class CacheClientProxy implements ClientSession {
       if (r instanceof PartitionedRegion) {
         // need to flush all buckets. SFO should be changed to target buckets
         // belonging to a particular PR, but it doesn't have that option right now
-        sfo = new StateFlushOperation(this._cache.getDistributedSystem().getDistributionManager());
+        sfo = new StateFlushOperation(this.cache.getDistributedSystem().getDistributionManager());
       } else {
         sfo = new StateFlushOperation((DistributedRegion) r);
       }
@@ -1378,7 +1339,7 @@ public class CacheClientProxy implements ClientSession {
     if (getHARegionQueue() != null) {
       if (flushState) {
         flushForInterestRegistration(regionName,
-            this._cache.getDistributedSystem().getDistributedMember());
+            this.cache.getDistributedSystem().getDistributedMember());
       }
       getHARegionQueue().setHasRegisteredInterest(true);
     }
@@ -1643,7 +1604,7 @@ public class CacheClientProxy implements ClientSession {
     if (logger.isDebugEnabled()) {
       logger.debug("About to send message directly to {}", this);
     }
-    if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+    if (this._messageDispatcher != null && this.socket != null && !this.socket.isClosed()) {
       // If the socket is open, send the message to it
       this._messageDispatcher.sendMessageDirectly(message);
       if (logger.isDebugEnabled()) {
@@ -1759,7 +1720,7 @@ public class CacheClientProxy implements ClientSession {
     if (this.isPrimary) {
       // Add the marker to the queue
       if (!processedMarker) {
-        EventID eventId = new EventID(this._cache.getDistributedSystem());
+        EventID eventId = new EventID(this.cache.getDistributedSystem());
         this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
       }
 
@@ -1810,30 +1771,18 @@ public class CacheClientProxy implements ClientSession {
   @Override
   public String toString() {
     StringBuffer buffer = new StringBuffer();
-    buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
-        .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
-        .append("; version=").append(clientVersion).append("]");
+    buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+        .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+        .append(clientVersion).append("]");
     return buffer.toString();
   }
 
   public String getState() {
     StringBuffer buffer = new StringBuffer();
-    buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
-        .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
-        .append("; version=").append(clientVersion).append("; paused=").append(isPaused())
-        .append("; alive=").append(isAlive()).append("; connected=").append(isConnected())
+    buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+        .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+        .append(clientVersion).append("; paused=").append(isPaused()).append("; alive=")
+        .append(isAlive()).append("; connected=").append(isConnected())
         .append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]");
 
     if (_messageDispatcher != null && isAlive()) {
@@ -1844,15 +1793,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public boolean isPrimary() {
-    // boolean primary = this._messageDispatcher.isAlive()
-    // || this._messageDispatcher._messageQueue.isPrimary();
-    boolean primary = this.isPrimary;
-    // System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
-    // System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " +
-    // this._messageDispatcher._messageQueue.isPrimary());
-    // System.out.println(this + ": IS PRIMARY: " + primary);
-    return primary;
-    // return this.isPrimary ;
+    return this.isPrimary;
   }
 
   protected boolean basicIsPrimary() {
@@ -1863,16 +1804,10 @@ public class CacheClientProxy implements ClientSession {
     this.isPrimary = isPrimary;
   }
 
-  // private static int nextId = 0;
-  // static protected int getNextId() {
-  // synchronized (CacheClientProxy.class) {
-  // return ++nextId;
-  // }
-  // }
-  /*
+  /**
    * Return this client's HA region queue
    * 
-   * @returns - HARegionQueue of the client
+   * @return HARegionQueue of the client
    */
   public HARegionQueue getHARegionQueue() {
     if (this._messageDispatcher != null) {
@@ -1881,7 +1816,6 @@ public class CacheClientProxy implements ClientSession {
     return null;
   }
 
-
   /**
    * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
    * 
@@ -1952,7 +1886,7 @@ public class CacheClientProxy implements ClientSession {
 
         // Close the proxy
         terminateDispatching(false);
-        _cacheClientNotifier._statistics.incQueueDroppedCount();
+        cacheClientNotifier._statistics.incQueueDroppedCount();
 
         /**
          * Setting the expiration task to null again and cancelling existing one, if any. See
@@ -1976,7 +1910,7 @@ public class CacheClientProxy implements ClientSession {
 
     };
     if (this._durableExpirationTask.compareAndSet(null, task)) {
-      _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
+      cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
     }
   }
 
@@ -1992,11 +1926,131 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
+  public static void setTestHook(TestHook value) {
+    testHook = value;
+  }
+
+  public static void unsetTestHook() {
+    testHook = null;
+  }
+
+  public static TestHook getTestHook() {
+    return testHook;
+  }
+
+  public static void setSlowStartForTesting() {
+    isSlowStartForTesting = true;
+  }
+
+  static void unsetSlowStartForTesting() {
+    isSlowStartForTesting = false;
+  }
+
+  static void setAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = true;
+  }
+
+  static void unsetAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = false;
+  }
+
+  Socket getSocketForTesting() {
+    return this.socket;
+  }
+
+  ClientInterestList[] getClientInterestListForTesting() {
+    return this.cils;
+  }
+
+  MessageDispatcher getMessageDispatcherForTesting() {
+    return this._messageDispatcher;
+  }
+
+  /**
+   * Returns the current number of CQS the client installed.
+   *
+   * @return int the current count of CQs for this client
+   */
+  public int getCqCount() {
+    synchronized (this) {
+      return this._statistics.getCqCount();
+    }
+  }
+
+  /**
+   * Increment the number of CQs the client installed
+   *
+   */
+  public void incCqCount() {
+    synchronized (this) {
+      this._statistics.incCqCount();
+    }
+  }
+
+  /**
+   * Decrement the number of CQs the client installed
+   *
+   */
+  public synchronized void decCqCount() {
+    synchronized (this) {
+      this._statistics.decCqCount();
+    }
+  }
+
+  /**
+   * Returns true if the client has one CQ
+   *
+   * @return true if the client has one CQ
+   */
+  public boolean hasOneCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 1;
+    }
+  }
+
+  /**
+   * Returns true if the client has no CQs
+   *
+   * @return true if the client has no CQs
+   */
+  public boolean hasNoCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 0;
+    }
+  }
+
+  /**
+   * Get map of regions with empty data policy
+   *
+   * @since GemFire 6.1
+   */
+  public Map getRegionsWithEmptyDataPolicy() {
+    return regionsWithEmptyDataPolicy;
+  }
+
+  public int incrementAndGetPingCounter() {
+    int pingCount = this.pingCounter.incrementAndGet();
+    return pingCount;
+  }
+
+  public void resetPingCounter() {
+    this.pingCounter.set(0);
+  }
+
+  /**
+   * Returns the number of seconds that have elapsed since the Client proxy created.
+   *
+   * @since GemFire 7.0
+   */
+  public long getUpTime() {
+    return (System.currentTimeMillis() - this.creationDate.getTime()) / 1000;
+  }
+
   /**
    * Class <code>ClientInterestList</code> provides a convenient interface for manipulating client
    * interest information.
    */
-  static protected class ClientInterestList {
+  static class ClientInterestList {
 
     final CacheClientProxy ccp;
 
@@ -2031,7 +2085,7 @@ public class CacheClientProxy implements ClientSession {
       }
       Set keysRegistered = null;
       synchronized (this.interestListLock) {
-        LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true);
+        LocalRegion r = (LocalRegion) this.ccp.cache.getRegion(regionName, true);
         if (r == null) {
           throw new RegionDestroyedException("Region could not be found for interest registration",
               regionName);
@@ -2055,7 +2109,7 @@ public class CacheClientProxy implements ClientSession {
 
     protected FilterProfile getProfile(String regionName) {
       try {
-        return this.ccp._cache.getFilterProfile(regionName);
+        return this.ccp.cache.getFilterProfile(regionName);
       } catch (CancelException e) {
         return null;
       }
@@ -2221,7 +2275,6 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-
   /**
    * Class <code>MessageDispatcher</code> is a <code>Thread</code> that processes messages bound for
    * the client by taking messsages from the message queue and sending them to the client over the
@@ -2234,34 +2287,17 @@ public class CacheClientProxy implements ClientSession {
      */
     protected final HARegionQueue _messageQueue;
 
-    // /**
-    // * An int used to keep track of the number of messages dropped for logging
-    // * purposes. If greater than zero then a warning has been logged about
-    // * messages being dropped.
-    // */
-    // private int _numberOfMessagesDropped = 0;
-
     /**
      * The proxy for which this dispatcher is processing messages
      */
     private final CacheClientProxy _proxy;
 
-    // /**
-    // * The conflator faciliates message conflation
-    // */
-    // protected BridgeEventConflator _eventConflator;
-
     /**
      * Whether the dispatcher is stopped
      */
     private volatile boolean _isStopped = true;
 
     /**
-     * guarded.By _pausedLock
-     */
-    // boolean _isPausedDispatcher = false;
-
-    /**
      * A lock object used to control pausing this dispatcher
      */
     protected final Object _pausedLock = new Object();
@@ -2274,11 +2310,6 @@ public class CacheClientProxy implements ClientSession {
     private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
 
     private final Lock socketWriteLock = socketLock.writeLock();
-    // /**
-    // * A boolean verifying whether a warning has already been issued if the
-    // * message queue has reached its capacity.
-    // */
-    // private boolean _messageQueueCapacityReachedWarning = false;
 
     /**
      * Constructor.
@@ -2303,7 +2334,7 @@ public class CacheClientProxy implements ClientSession {
         HARegionQueueAttributes harq = new HARegionQueueAttributes();
         harq.setBlockingQueueCapacity(proxy._maximumMessageCount);
         harq.setExpiryTime(proxy._messageTimeToLive);
-        ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
+        ((HAContainerWrapper) proxy.cacheClientNotifier.getHaContainer())
             .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
         boolean createDurableQueue = proxy.proxyID.isDurable();
         boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
@@ -2314,7 +2345,7 @@ public class CacheClientProxy implements ClientSession {
         }
         this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
             getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue,
-            proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
+            proxy.cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
             this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta);
         // Check if interests were registered during HARegion GII.
         if (this._proxy.hasRegisteredInterested()) {
@@ -2405,10 +2436,6 @@ public class CacheClientProxy implements ClientSession {
             Thread.sleep(500);
           } catch (InterruptedException e) {
             interrupted = true;
-            /*
-             * GemFireCache c = (GemFireCache)_cache;
-             * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);
-             */
           } catch (CancelException e) {
             break;
           } catch (CacheException e) {
@@ -2503,7 +2530,7 @@ public class CacheClientProxy implements ClientSession {
       ClientMessage clientMessage = null;
       while (!isStopped()) {
         // SystemFailure.checkFailure(); DM's stopper does this
-        if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
+        if (this._proxy.cache.getCancelCriterion().isCancelInProgress()) {
           break;
         }
         try {
@@ -2752,9 +2779,6 @@ public class CacheClientProxy implements ClientSession {
       }
       Message message = null;
 
-      // byte[] latestValue =
-      // this._eventConflator.getLatestValue(clientMessage);
-
       if (clientMessage instanceof ClientUpdateMessage) {
         byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
         if (logger.isTraceEnabled()) {
@@ -2771,7 +2795,7 @@ public class CacheClientProxy implements ClientSession {
 
         message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue);
 
-        if (AFTER_MESSAGE_CREATION_FLAG) {
+        if (afterMessageCreationForTesting) {
           ClientServerObserver bo = ClientServerObserverHolder.getInstance();
           bo.afterMessageCreation(message);
         }
@@ -2779,37 +2803,9 @@ public class CacheClientProxy implements ClientSession {
         message = clientMessage.getMessage(getProxy(), true /* notify */);
       }
 
-      // //////////////////////////////
-      // TEST CODE BEGIN (Throws exception to test closing proxy)
-      // if (true) throw new IOException("test");
-      // TEST CODE END
-      // //////////////////////////////
-      // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-      // latestValue);
-      // Message message = clientMessage.getMessage(); removed during merge.
-      // BugFix for BUG#38206 and BUG#37791
       if (!this._proxy.isPaused()) {
         sendMessage(message);
 
-        // //////////////////////////////
-        // TEST CODE BEGIN (Throws exception to test closing proxy)
-        // if (true) throw new IOException("test");
-        // TEST CODE END
-        // //////////////////////////////
-        // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-        // latestValue);
-        // Message message = clientMessage.getMessage(); removed during merge.
-        // message.setComms(getSocket(), getCommBuffer(), getStatistics());
-        // message.send();
-
-        // //////////////////////////////
-        // TEST CODE BEGIN (Introduces random wait in client)
-        // Sleep a random number of ms
-        // java.util.Random rand = new java.util.Random();
-        // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {}
-        // TEST CODE END
-        // //////////////////////////////
-
         if (logger.isTraceEnabled()) {
           logger.trace("{}: Dispatched {}", this, clientMessage);
         }
@@ -2851,7 +2847,7 @@ public class CacheClientProxy implements ClientSession {
       try {
         this._messageQueue.put(clientMessage);
         if (this._proxy.isPaused() && this._proxy.isDurable()) {
-          this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
+          this._proxy.cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
           if (logger.isDebugEnabled()) {
             logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
           }
@@ -2955,7 +2951,7 @@ public class CacheClientProxy implements ClientSession {
       this._pausedLock.notifyAll();
     }
 
-    protected Object deserialize(byte[] serializedBytes) {
+    private Object deserialize(byte[] serializedBytes) {
       Object deserializedObject = serializedBytes;
       // This is a debugging method so ignore all exceptions like
       // ClassNotFoundException
@@ -2979,89 +2975,7 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  /**
-   * Returns the current number of CQS the client installed.
-   *
-   * @return int the current count of CQs for this client
-   */
-  public int getCqCount() {
-    synchronized (this) {
-      return this._statistics.getCqCount();
-    }
-  }
-
-  /**
-   * Increment the number of CQs the client installed
-   *
-   */
-  public void incCqCount() {
-    synchronized (this) {
-      this._statistics.incCqCount();
-    }
-  }
-
-  /**
-   * Decrement the number of CQs the client installed
-   *
-   */
-  public synchronized void decCqCount() {
-    synchronized (this) {
-      this._statistics.decCqCount();
-    }
-  }
-
-  /**
-   * Returns true if the client has one CQ
-   *
-   * @return true if the client has one CQ
-   */
-  public boolean hasOneCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 1;
-    }
-  }
-
-  /**
-   * Returns true if the client has no CQs
-   *
-   * @return true if the client has no CQs
-   */
-  public boolean hasNoCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 0;
-    }
-  }
-
-  /**
-   * Get map of regions with empty data policy
-   *
-   * @since GemFire 6.1
-   */
-  public Map getRegionsWithEmptyDataPolicy() {
-    return regionsWithEmptyDataPolicy;
-  }
-
-  public int incrementAndGetPingCounter() {
-    int pingCount = this.pingCounter.incrementAndGet();
-    return pingCount;
-  }
-
-  public void resetPingCounter() {
-    this.pingCounter.set(0);
-  }
-
-  /**
-   * Returns the number of seconds that have elapsed since the Client proxy created.
-   * 
-   * @since GemFire 7.0
-   */
-  public long getUpTime() {
-    return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
-  }
-
   public interface TestHook {
-    public void doTestHook(String spot);
+    void doTestHook(String spot);
   }
-
-  public static TestHook testHook;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e21a834..6e8f9ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -22,7 +22,11 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CacheClientStatus;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.IncomingGatewayStatus;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -32,7 +36,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.logging.log4j.Logger;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 6bbe7b8..7d1603d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -352,8 +352,8 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     byte[] serializedValue = null;
     Message message = null;
     boolean conflation = false;
-    conflation = (proxy.clientConflation == HandShake.CONFLATION_ON)
-        || (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this.shouldBeConflated());
+    conflation = (proxy.isClientConflationOn())
+        || (proxy.isClientConflationDefault() && this.shouldBeConflated());
 
     if (latestValue != null) {
       serializedValue = latestValue;

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 6e119c0..8a51c31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -62,6 +62,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireException;
+import org.apache.geode.LogWriter;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
@@ -1669,8 +1670,8 @@ public class HandShake implements ClientHandShake {
    * not
    */
   public static Object verifyCredentials(String authenticatorMethod, Properties credentials,
-      Properties securityProperties, InternalLogWriter logWriter,
-      InternalLogWriter securityLogWriter, DistributedMember member)
+      Properties securityProperties, LogWriter logWriter, LogWriter securityLogWriter,
+      DistributedMember member)
       throws AuthenticationRequiredException, AuthenticationFailedException {
 
     if (!AcceptorImpl.isAuthenticationRequired()) {
@@ -1702,8 +1703,8 @@ public class HandShake implements ClientHandShake {
 
     String methodName = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     return verifyCredentials(methodName, this.credentials, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), this.id.getDistributedMember());
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(),
+        this.id.getDistributedMember());
   }
 
   public void sendCredentialsForWan(OutputStream out, InputStream in) {
@@ -1731,8 +1732,7 @@ public class HandShake implements ClientHandShake {
     String authenticator = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     Properties peerWanProps = readCredentials(dis, dos, this.system);
     verifyCredentials(authenticator, peerWanProps, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), member);
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(), member);
   }
 
   private static int getKeySize(String skAlgo) {

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
index 5a229d3..dfce317 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
@@ -151,6 +151,16 @@ public class LogService extends LogManager {
   }
 
   /**
+   * Returns a Logger with the name of @{link SECURITY_LOGGER_NAME}.
+   *
+   * @return The security Logger.
+   */
+  public static Logger getSecurityLogger() {
+    return new FastLogger(
+        LogManager.getLogger(SECURITY_LOGGER_NAME, GemFireParameterizedMessageFactory.INSTANCE));
+  }
+
+  /**
    * Returns a LogWriterLogger that is decorated with the LogWriter and LogWriterI18n methods.
    * <p>
    * This is the bridge to LogWriter and LogWriterI18n that we need to eventually stop using in

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7aa11b7..8ff541b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -14,21 +14,14 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
@@ -39,7 +32,6 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.BindException;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -85,7 +77,8 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
       } catch (IllegalArgumentException expected) {
       }
@@ -95,7 +88,7 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST,
-            CacheServer.DEFAULT_TCP_NO_DELAY);
+            CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns of zero");
       } catch (IllegalArgumentException expected) {
       }
@@ -105,12 +98,14 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         fail("Expecetd a BindException while attaching to the same port");
       } catch (BindException expected) {
       }
@@ -119,7 +114,8 @@ public class AcceptorImplJUnitTest {
           CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
-          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+          this.cache.getCancelCriterion());
       assertEquals(port2, a3.getPort());
       InternalDistributedSystem isystem =
           (InternalDistributedSystem) this.cache.getDistributedSystem();

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
new file mode 100644
index 0000000..7ab539d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+@Category(UnitTest.class)
+public class AcceptorImplTest {
+
+  @Before
+  public void before() throws Exception {
+    DistributionConfigImpl distributionConfig = new DistributionConfigImpl(new Properties());
+    SocketCreatorFactory.setDistributionConfig(distributionConfig);
+  }
+
+  @After
+  public void after() throws Exception {
+    SocketCreatorFactory.close();
+  }
+
+  @Test
+  public void constructWithDefaults() throws Exception {
+    /*
+     * Problems:
+     * 
+     * this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
+     * messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
+     * 
+     * this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+     * this.clientNotifier.getStats());
+     * 
+     * LoggingThreadGroup / ThreadFactory / ThreadPoolExecutor
+     * 
+     * isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+     * 
+     * isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+     * 
+     * 
+     * String postAuthzFactoryName =
+     * this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+     * 
+     */
+
+    int port = 0;
+    String bindHostName = SocketCreator.getLocalHost().getHostName();
+    boolean notifyBySubscription = false;
+    int socketBufferSize = 1;
+    int maximumTimeBetweenPings = 0;
+    InternalCache internalCache = null;
+    int maxConnections = 0;
+    int maxThreads = 0;
+    int maximumMessageCount = 0;
+    int messageTimeToLive = 0;
+    ConnectionListener listener = null;
+    List overflowAttributesList = null;
+    boolean isGatewayReceiver = false;
+    List<GatewayTransportFilter> transportFilter = Collections.emptyList();
+    boolean tcpNoDelay = false;
+    CancelCriterion cancelCriterion = null;
+
+    AcceptorImpl acceptor = new AcceptorImpl(port, bindHostName, notifyBySubscription,
+        socketBufferSize, maximumTimeBetweenPings, internalCache, maxConnections, maxThreads,
+        maximumMessageCount, messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver,
+        transportFilter, tcpNoDelay, cancelCriterion);
+
+    assertThat(acceptor).isNotNull();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
index 31f67aa..f4a8cc8 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
@@ -99,7 +99,7 @@ public class ClientConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void setIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", "15000");
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
index 1a76daa..efc0367 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -166,7 +166,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void installObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+    CacheClientProxy.setAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new DelaySendingEvent());
   }
 
@@ -176,7 +176,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void cleanupObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+    CacheClientProxy.unsetAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index b4f3185..43330a5 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -62,13 +62,9 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
@@ -1014,7 +1010,8 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
       while (iter_prox.hasNext()) {
         CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
         // CCP should not contain region1
-        Set akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions;
+        Set akr = ccp
+            .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].regions;
         assertNotNull(akr);
         assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1));
         // CCP should contain region2
@@ -1352,7 +1349,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
    *
    */
   public static void unsetSlowDispatcherFlag() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
index b1e16ee..275e458 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
@@ -112,7 +112,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    * 
    */
   public static void setIsSlowStart(String milis) {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", milis);
   }
 
@@ -121,7 +121,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void unsetIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
index 544f732..9d60cc7 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -459,7 +459,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
         iter_prox = ccn.getClientProxies().iterator();
         if (iter_prox.hasNext()) {
           proxy = (CacheClientProxy) iter_prox.next();
-          return proxy._messageDispatcher.isAlive();
+          return proxy.getMessageDispatcherForTesting().isAlive();
         } else {
           return false;
         }
@@ -510,7 +510,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
     if (iter_prox.hasNext()) {
       CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
       assertFalse("Dispatcher on secondary should not be alive",
-          proxy._messageDispatcher.isAlive());
+          proxy.getMessageDispatcherForTesting().isAlive());
     }
   }
 
@@ -818,8 +818,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+          Set keysMap =
+              (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                  .getProfile(Region.SEPARATOR + REGION_NAME)
+                  .getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null && keysMap.size() == 2;
         }
 
@@ -830,8 +832,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-          .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+      Set keysMap =
+          (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(2, keysMap.size());
       assertTrue(keysMap.contains(k1));
@@ -879,8 +882,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+          Set keysMap =
+              (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                  .getProfile(Region.SEPARATOR + REGION_NAME)
+                  .getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null;
         }
 
@@ -891,8 +896,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-          .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+      Set keysMap =
+          (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(1, keysMap.size());
       assertFalse(keysMap.contains(k1));

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
index 6aea509..3585c3e 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
@@ -471,7 +471,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -529,7 +529,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
     } catch (Exception ex) {
       fail("while setting verifyDispatcherIsNotAlive  " + ex);

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
index 041cd38..be9265b 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
@@ -434,8 +434,8 @@ public class InterestListRecoveryDUnitTest extends JUnit4DistributedTestCase {
   public static Set getKeysOfInterestMap(CacheClientProxy proxy, String regionName) {
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]);
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]._keysOfInterest);
-    return proxy.cils[RegisterInterestTracker.interestListIndex].getProfile(regionName)
-        .getKeysOfInterestFor(proxy.getProxyID());
+    return proxy.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+        .getProfile(regionName).getKeysOfInterestFor(proxy.getProxyID());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
index 4a98298..1635fca 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
@@ -189,10 +189,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            if (proxy._messageDispatcher == null) {
+            if (proxy.getMessageDispatcherForTesting() == null) {
               return false;
             }
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -245,7 +245,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
 
     } catch (Exception ex) {
@@ -427,8 +427,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-                .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+            Set keysMap = (Set) ccp
+                .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                    .getProfile(Region.SEPARATOR + REGION_NAME)
+                    .getKeysOfInterestFor(ccp.getProxyID());
             if (keysMap == null) {
               excuse = "keys of interest is null";
               return false;
@@ -446,8 +448,9 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
         };
         Wait.waitForCriterion(wc, 180 * 1000, 2 * 1000, true);
 
-        Set keysMap = ccp.cils[RegisterInterestTracker.interestListIndex]
-            .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+        Set keysMap =
+            ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
         assertTrue(keysMap.contains(k1));
         assertTrue(keysMap.contains(k2));
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
new file mode 100644
index 0000000..94a2e8b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.SystemFailure.loadEmergencyClasses;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class Put65BenchTest {
+
+  public Command put65Command;
+  public ServerConnection mockServerConnection;
+  public Message mockMessage;
+
+  @Before
+  public void setup() throws Exception {
+    loadEmergencyClasses();
+
+    this.put65Command = Put65.getCommand();
+
+    this.mockServerConnection =
+        mock(ServerConnection.class, withSettings().name("mockServerConnection"));
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache"));
+    when(this.mockServerConnection.getCache()).thenReturn(mockCache);
+
+    TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager"));
+    when(mockCache.getTxManager()).thenReturn(mockTxManager);
+
+    CacheServerStats mockCacheServerStats =
+        mock(CacheServerStats.class, withSettings().name("mockCacheServerStats"));
+    when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats);
+
+    ClientProxyMembershipID mockProxyId =
+        mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+    Message mockErrorResponseMessage =
+        mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+    when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(mockErrorResponseMessage);
+
+    Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+    when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
+
+    Part mockOperationPart = mock(Part.class, withSettings().name("mockOperationPart"));
+    when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part mockFlagsPart = mock(Part.class, withSettings().name("mockFlagsPart"));
+    when(mockFlagsPart.getInt()).thenReturn(0);
+
+    Part mockKeyPart = mock(Part.class, withSettings().name("mockKeyPart"));
+    when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+    when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
+
+    Part mockIsDeltaPart = mock(Part.class, withSettings().name("mockIsDeltaPart"));
+    when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part mockValuePart = mock(Part.class, withSettings().name("mockValuePart"));
+    when(mockValuePart.getObject()).thenReturn("mockValuePart");
+
+    Part mockEventPart = mock(Part.class, withSettings().name("mockEventPart"));
+    when(mockEventPart.getObject()).thenReturn("mockEventPart");
+
+    Part mockCallbackArgPart = mock(Part.class, withSettings().name("mockCallbackArgPart"));
+    when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
+
+    this.mockMessage = mock(Message.class, withSettings().name("mockMessage"));
+
+    when(this.mockMessage.getTransactionId()).thenReturn(NOTX);
+
+    when(this.mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+    when(this.mockMessage.getPart(1)).thenReturn(mockOperationPart);
+    when(this.mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+    when(this.mockMessage.getPart(3)).thenReturn(mockKeyPart);
+    when(this.mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+    when(this.mockMessage.getPart(5)).thenReturn(mockValuePart);
+    when(this.mockMessage.getPart(6)).thenReturn(mockEventPart);
+    when(this.mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
+  }
+
+  @Test
+  public void benchmark() {
+    this.put65Command.execute(this.mockMessage, this.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}


[04/12] geode git commit: GEODE-2732 after auto-reconnect a server is restarted on the default port

Posted by kl...@apache.org.
GEODE-2732 after auto-reconnect a server is restarted on the default port

Changes to the new test based on feedback from Galen


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/669d3ed1
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/669d3ed1
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/669d3ed1

Branch: refs/heads/feature/GEODE-2632
Commit: 669d3ed1f2ab7e05edfa15bb19b5782ebc05d753
Parents: 742c8f2
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 16:49:27 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 16:49:27 2017 -0700

----------------------------------------------------------------------
 .../cache30/ReconnectWithCacheXMLDUnitTest.java  | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/669d3ed1/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
index 4f2fac1..2167a06 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
@@ -26,6 +26,8 @@ import org.apache.geode.distributed.internal.membership.MembershipTestHook;
 import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.CacheServerLauncher;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -38,6 +40,11 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * This test exercises auto-reconnect functionality when there is a cache-server that was started by
+ * gfsh but was configured both by gfsh and a cache.xml file. The JIRA ticket for this is
+ * GEODE-2732.
+ */
 @Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
 public class ReconnectWithCacheXMLDUnitTest extends JUnit4CacheTestCase {
 
@@ -96,6 +103,18 @@ public class ReconnectWithCacheXMLDUnitTest extends JUnit4CacheTestCase {
     MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
     assertTrue(membershipFailed.get());
 
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return cache.getReconnectedCache() != null;
+      }
+
+      @Override
+      public String description() {
+        return "waiting for cache to reconnect";
+      }
+    };
+    Wait.waitForCriterion(wc, 60000, 5000, true);
     await().atMost(60, TimeUnit.SECONDS).until(() -> cache.getReconnectedCache() != null);
 
     Cache newCache = cache.getReconnectedCache();


[09/12] geode git commit: Create ClientCachePutBench

Posted by kl...@apache.org.
Create ClientCachePutBench


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0f4f2940
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0f4f2940
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0f4f2940

Branch: refs/heads/feature/GEODE-2632
Commit: 0f4f29408a10030fa46f09b6c64eb62f81396e60
Parents: d497d63
Author: Kirk Lund <kl...@apache.org>
Authored: Mon Apr 3 14:52:28 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Fri Apr 7 12:36:31 2017 -0700

----------------------------------------------------------------------
 geode-core/build.gradle                         |   5 +
 .../sockets/command/ClientCachePutBench.java    | 174 +++++++++++++++++++
 .../cache/tier/sockets/command/Put65Bench.java  | 120 +++++++++++++
 .../command/ClientCachePutBench-server.xml      |  29 ++++
 .../internal/ClusterConfigurationService.java   |  20 ++-
 .../cache/tier/sockets/BaseCommand.java         |  35 ++--
 .../tier/sockets/ClientProxyMembershipID.java   |   5 +-
 .../geode/distributed/ServerLauncherUtils.java  |  30 ++++
 .../cache/tier/sockets/CacheServerUtils.java    |  55 ++++++
 .../command/ExperimentIntegrationTest.java      |  80 +++++++++
 .../tier/sockets/command/ExperimentTest.java    | 121 +++++++++++++
 11 files changed, 646 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/build.gradle
----------------------------------------------------------------------
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 757599a..fd56fe1 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -17,6 +17,7 @@
 
 
 apply plugin: 'antlr'
+apply plugin: 'me.champeau.gradle.jmh'
 
 sourceSets {
   jca {
@@ -220,5 +221,9 @@ dependencies {
   classesOutput sourceSets.main.output
 }
 
+jmh {
+  duplicateClassesStrategy = 'warn'
+}
+
 tasks.eclipse.dependsOn(generateGrammarSource)
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
new file mode 100644
index 0000000..a1cbd81
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.commons.io.FileUtils.*;
+import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.net.SocketCreator;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@Measurement(iterations = 5)
+@Warmup(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Thread)
+@SuppressWarnings("unused")
+public class ClientCachePutBench {
+
+  @Test
+  public void tempTest() throws Exception {
+    String SERVER_XML_FILE_NAME =
+        "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+            + "/ClientCachePutBench-server.xml";
+    assertThat(new File(getClass().getResource(SERVER_XML_FILE_NAME).getFile())).exists();
+  }
+
+  @State(Scope.Benchmark)
+  public static class ClientState {
+    // public static final String SERVER_XML_FILE_NAME =
+    // "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+    // + "/ClientCachePutBench-server.xml";
+    public static final String REGION_NAME = "clientCachePutBench-region";
+
+    public Random random;
+
+    public int serverPort;
+    public Process process;
+    public ServerLauncher launcher;
+    public File serverDirectory;
+
+    public ClientCache clientCache;
+    public Region<String, String> region;
+
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Setup(Level.Trial)
+    public void startServer() throws Exception {
+      this.random = new Random(System.nanoTime());
+
+      this.temporaryFolder.create();
+      this.serverDirectory = this.temporaryFolder.getRoot();
+
+      String SERVER_XML_FILE_NAME =
+          "/" + StringUtils.replace(ClientCachePutBench.class.getPackage().getName(), ".", "/")
+              + "/ClientCachePutBench-server.xml";
+
+      URL srcServerXml = getClass().getResource(SERVER_XML_FILE_NAME);
+      assertThat(srcServerXml).isNotNull();
+      File destServerXml = new File(this.serverDirectory, SERVER_XML_FILE_NAME);
+      copyURLToFile(srcServerXml, destServerXml);
+
+      this.serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+      List<String> jvmArguments = getJvmArguments();
+
+      List<String> command = new ArrayList<>();
+      command.add(
+          new File(new File(System.getProperty("java.home"), "bin"), "java").getCanonicalPath());
+      for (String jvmArgument : jvmArguments) {
+        command.add(jvmArgument);
+      }
+      command.add("-Dgemfire.cache-xml-file=" + destServerXml.getAbsolutePath());
+      command.add("-cp");
+      command.add(System.getProperty("java.class.path"));
+      command.add(ServerLauncher.class.getName());
+      command.add(ServerLauncher.Command.START.getName());
+      command.add("server1");
+      command.add("--server-port=" + this.serverPort);
+      // command.add("--redirect-output");
+
+      this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start();
+
+      boolean forever = true;
+      while (forever) {
+        assertThat(this.process.isAlive()).isTrue();
+        Thread.sleep(10000);
+      }
+
+      this.clientCache =
+          new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create();
+      this.region =
+          this.clientCache.<String, String>createClientRegionFactory(ClientRegionShortcut.PROXY)
+              .create(REGION_NAME);
+    }
+
+    @TearDown(Level.Trial)
+    public void stopServer() throws Exception {
+      try {
+        this.clientCache.close(false);
+        new ServerLauncher.Builder().setWorkingDirectory(this.serverDirectory.getAbsolutePath())
+            .build().stop();
+      } finally {
+        if (this.process != null) {
+          this.process.destroyForcibly();
+        }
+        this.temporaryFolder.delete();
+      }
+    }
+
+    private List<String> getJvmArguments() {
+      List<String> jvmArguments = new ArrayList<>();
+      jvmArguments.add(
+          "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.MCAST_PORT + "=0");
+      jvmArguments.add(
+          "-D" + DistributionConfig.GEMFIRE_PREFIX + ConfigurationProperties.LOCATORS + "\"\"");
+      return jvmArguments;
+    }
+  }
+
+  @Benchmark
+  public void test(ClientState state, Blackhole blackhole) throws Exception {
+    String key = "key-" + state.random.nextInt();
+    String value = "value-" + state.random.nextInt();
+    String oldValue = state.region.put(key, value);
+    blackhole.consume(new Object[] {key, value, oldValue});
+    blackhole.consume(oldValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
new file mode 100644
index 0000000..6ccd8c3
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.SystemFailure.loadEmergencyClasses;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+public class Put65Bench {
+
+  @State(Scope.Benchmark)
+  public static class ServerConnectionState {
+    public Command command;
+    public ServerConnection mockServerConnection;
+    public Message message;
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+      loadEmergencyClasses();
+
+      this.command = Put65.getCommand();
+
+      this.mockServerConnection = mock(ServerConnection.class);
+      when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+      TXManagerImpl txManager = mock(TXManagerImpl.class);
+      GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+      when(cache.getTxManager()).thenReturn(txManager);
+
+      when(this.mockServerConnection.getCache()).thenReturn(cache);
+
+      CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+      when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+
+      // .getDistributedMember()
+      ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+      when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+      Message errorResponseMessage = mock(Message.class);
+      when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+
+      Part regionNamePart = mock(Part.class);
+      when(regionNamePart.getString()).thenReturn("regionNamePart");
+
+      Part operationPart = mock(Part.class);
+      when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+
+      Part flagsPart = mock(Part.class);
+      when(flagsPart.getInt()).thenReturn(0);
+
+      Part keyPart = mock(Part.class);
+      when(keyPart.getObject()).thenReturn("keyPart");
+      when(keyPart.getStringOrObject()).thenReturn("keyPart");
+
+      Part isDeltaPart = mock(Part.class);
+      when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+      Part valuePart = mock(Part.class);
+      when(valuePart.getObject()).thenReturn("valuePart");
+
+      Part eventPart = mock(Part.class);
+      when(eventPart.getObject()).thenReturn("eventPart");
+
+      Part callbackArgPart = mock(Part.class);
+      when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+
+      message = mock(Message.class);
+
+      when(message.getTransactionId()).thenReturn(NOTX);
+
+      when(message.getPart(0)).thenReturn(regionNamePart);
+      when(message.getPart(1)).thenReturn(operationPart);
+      when(message.getPart(2)).thenReturn(flagsPart);
+      when(message.getPart(3)).thenReturn(keyPart);
+      when(message.getPart(4)).thenReturn(isDeltaPart);
+      when(message.getPart(5)).thenReturn(valuePart);
+      when(message.getPart(6)).thenReturn(eventPart);
+      when(message.getPart(7)).thenReturn(callbackArgPart);
+    }
+  }
+
+  // @Benchmark
+  public void benchmark(ServerConnectionState state, Blackhole blackhole) {
+    state.command.execute(state.message, state.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
new file mode 100644
index 0000000..2013b37
--- /dev/null
+++ b/geode-core/src/jmh/resources/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench-server.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one or more
+	contributor license agreements.  See the NOTICE file distributed with
+	this work for additional information regarding copyright ownership.
+	The ASF licenses this file to You under the Apache License, Version 2.0
+	(the "License"); you may not use this file except in compliance with
+	the License.  You may obtain a copy of the License at
+
+	     http://www.apache.org/licenses/LICENSE-2.0
+
+	Unless required by applicable law or agreed to in writing, software
+	distributed under the License is distributed on an "AS IS" BASIS,
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+	See the License for the specific language governing permissions and
+	limitations under the License.
+-->
+<cache
+        xmlns="http://geode.apache.org/schema/cache"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+        version="1.0">
+    <region name="clientCachePutBench-region" refid="REPLICATE">
+        <region-attributes>
+            <key-constraint>java.lang.String</key-constraint>
+            <value-constraint>java.lang.String</value-constraint>
+        </region-attributes>
+    </region>
+</cache>

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 95d1a5b..74df19c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -386,10 +386,22 @@ public class ClusterConfigurationService {
 
     createConfigDirIfNecessary(groupName);
 
-    byte[] jarBytes = locators.stream()
-        .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
-        .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
-            "No locators have a deployed jar named " + jarName + " in " + groupName));
+    byte[] jarBytes = null;
+    for (DistributedMember locator : locators) {
+      jarBytes = downloadJarFromLocator(locator, groupName, jarName);
+      if (jarBytes != null) {
+        break;
+      }
+    }
+    if (jarBytes == null) {
+      throw new IllegalStateException(
+          "No locators have a deployed jar named " + jarName + " in " + groupName);
+    }
+
+    // byte[] jarBytes = locators.stream()
+    // .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName))
+    // .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException(
+    // "No locators have a deployed jar named " + jarName + " in " + groupName));
 
     File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
     FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index d217672..ff9daca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -91,31 +91,26 @@ public abstract class BaseCommand implements Command {
   private static final int MAX_INCOMING_MSGS =
       Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
 
-  private static final Semaphore incomingDataLimiter;
-
-  private static final Semaphore incomingMsgLimiter;
-  static {
-    Semaphore tmp;
-    if (MAX_INCOMING_DATA > 0) {
-      // backport requires that this is fair since we inc by values > 1
-      tmp = new Semaphore(MAX_INCOMING_DATA, true);
-    } else {
-      tmp = null;
-    }
-    incomingDataLimiter = tmp;
-    if (MAX_INCOMING_MSGS > 0) {
-      tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
-      // performance
-    } else {
-      tmp = null;
-    }
-    incomingMsgLimiter = tmp;
+  // backport requires that this is fair since we inc by values > 1
+  private static final Semaphore incomingDataLimiter =
+      createIncomingLimiterSemaphore(MAX_INCOMING_DATA, true);
+
+  // unfair for best performance
+  private static final Semaphore incomingMsgLimiter =
+      createIncomingLimiterSemaphore(MAX_INCOMING_MSGS, false);
 
+  private static Semaphore createIncomingLimiterSemaphore(final int maximum, final boolean fair) {
+    Semaphore semaphore = null;
+    if (maximum > 0) {
+      semaphore = new Semaphore(maximum, fair);
+    }
+    return semaphore;
   }
 
   protected SecurityService securityService = IntegratedSecurityService.getSecurityService();
 
-  final public void execute(Message msg, ServerConnection servConn) {
+  @Override
+  public void execute(final Message msg, final ServerConnection servConn) {
     // Read the request and update the statistics
     long start = DistributionStats.getStatTime();
     // servConn.resetTransientData();

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..46e43c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -34,11 +34,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
 
 /**
  * This class represents a ConnectionProxy of the CacheClient
- * 
- * 
- * 
  */
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
     implements DataSerializableFixedID, Serializable, Externalizable {
 
   private static final Logger logger = LogService.getLogger();

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
new file mode 100644
index 0000000..017e0f5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed;
+
+import org.apache.geode.cache.Cache;
+
+/**
+ * Provides tests a way to access non-public state in ServerLauncher
+ */
+public class ServerLauncherUtils {
+
+  /**
+   * Returns the Cache from an online in-process ServerLauncher instance
+   */
+  public static Cache getCache(final ServerLauncher serverLauncher) {
+    return serverLauncher.getCache();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
new file mode 100644
index 0000000..8cd7622
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.CacheServerImpl;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides tests a way to access CacheServer, AcceptorImpl and ServerConnection
+ */
+public class CacheServerUtils {
+
+  /**
+   * Returns single CacheServer for the specified Cache instance
+   */
+  public static CacheServer getCacheServer(final Cache cache) {
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServer cacheServer = cacheServers.get(0);
+    return cacheServer;
+  }
+
+  /**
+   * Returns AcceptorImpl for the specified CacheServer instance
+   */
+  public static AcceptorImpl getAcceptorImpl(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    return acceptor;
+  }
+
+  /**
+   * Returns single ServerConnection for the specified CacheServer instance
+   */
+  public static ServerConnection getServerConnection(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
+    ServerConnection serverConnection = serverConnections.iterator().next(); // null
+    return serverConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
new file mode 100644
index 0000000..2d900dc
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+@Category(IntegrationTest.class)
+public class ExperimentIntegrationTest {
+
+  private ServerLauncher serverLauncher;
+  private ServerConnection serverConnection;
+
+  @Before
+  public void before() throws Exception {
+    int serverPort = getRandomAvailablePort(SOCKET);
+
+    this.serverLauncher =
+        new ServerLauncher.Builder().setMemberName("server").setServerPort(serverPort).build();
+    this.serverLauncher.start();
+
+    Cache cache = getCache(this.serverLauncher);
+    CacheServer cacheServer = getCacheServer(cache);
+    AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+    Socket mockSocket = mock(Socket.class);
+    when(mockSocket.getInetAddress()).thenReturn(SocketCreator.getLocalHost());
+
+    this.serverConnection =
+        new ServerConnection(mockSocket, cache, null, null, DEFAULT_HANDSHAKE_TIMEOUT_MS,
+            CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, "client", Acceptor.CLIENT_TO_SERVER, acceptor);
+
+    preConditions();
+  }
+
+  public void preConditions() throws Exception {
+    assertThat(this.serverLauncher.status().getStatus()).isEqualTo(ONLINE);
+  }
+
+  @Test
+  public void handlePutFromFakeClient() throws Exception {
+    Message message = mock(Message.class);
+    Command command = mock(Command.class);
+    command.execute(message, this.serverConnection);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0f4f2940/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
new file mode 100644
index 0000000..b52e81d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(UnitTest.class)
+public class ExperimentTest {
+
+  private ServerConnection mockServerConnection;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void before() throws Exception {
+    this.mockServerConnection = mock(ServerConnection.class);
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    TXManagerImpl txManager = mock(TXManagerImpl.class);
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    when(cache.getTxManager()).thenReturn(txManager);
+
+    when(this.mockServerConnection.getCache()).thenReturn(cache);
+
+    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+    when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+
+    // .getDistributedMember()
+    ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+    Message errorResponseMessage = mock(Message.class);
+    when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+  }
+
+  @Test
+  public void handlePutFromFakeClient() throws Exception {
+    Part regionNamePart = mock(Part.class);
+    when(regionNamePart.getString()).thenReturn("regionNamePart");
+
+    Part operationPart = mock(Part.class);
+    when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part flagsPart = mock(Part.class);
+    when(flagsPart.getInt()).thenReturn(0);
+
+    Part keyPart = mock(Part.class);
+    when(keyPart.getObject()).thenReturn("keyPart");
+    when(keyPart.getStringOrObject()).thenReturn("keyPart");
+
+    Part isDeltaPart = mock(Part.class);
+    when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part valuePart = mock(Part.class);
+    when(valuePart.getObject()).thenReturn("valuePart");
+
+    Part eventPart = mock(Part.class);
+    when(eventPart.getObject()).thenReturn("eventPart");
+
+    Part callbackArgPart = mock(Part.class);
+    when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+
+    Message message = mock(Message.class);
+
+    when(message.getTransactionId()).thenReturn(NOTX);
+
+    when(message.getPart(0)).thenReturn(regionNamePart);
+    when(message.getPart(1)).thenReturn(operationPart);
+    when(message.getPart(2)).thenReturn(flagsPart);
+    when(message.getPart(3)).thenReturn(keyPart);
+    when(message.getPart(4)).thenReturn(isDeltaPart);
+    when(message.getPart(5)).thenReturn(valuePart);
+    when(message.getPart(6)).thenReturn(eventPart);
+    when(message.getPart(7)).thenReturn(callbackArgPart);
+
+    assertThat(message.getPart(0)).isSameAs(regionNamePart);
+    assertThat(message.getPart(1)).isSameAs(operationPart);
+    assertThat(message.getPart(2)).isSameAs(flagsPart);
+    assertThat(message.getPart(3)).isSameAs(keyPart);
+    assertThat(message.getPart(4)).isSameAs(isDeltaPart);
+    assertThat(message.getPart(5)).isSameAs(valuePart);
+    assertThat(message.getPart(6)).isSameAs(eventPart);
+    assertThat(message.getPart(7)).isSameAs(callbackArgPart);
+
+    Command command = Put65.getCommand();
+    command.execute(message, this.mockServerConnection);
+  }
+}


[08/12] geode git commit: GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.

Posted by kl...@apache.org.
GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a

Branch: refs/heads/feature/GEODE-2632
Commit: d497d63af422b3b98c480698a9470812539f8a83
Parents: 799548e
Author: eshu <es...@pivotal.io>
Authored: Fri Apr 7 11:37:35 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Apr 7 11:37:35 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/DistributedRegion.java |   2 +-
 .../cache/SearchLoadAndWriteProcessor.java      |  61 +++++++----
 .../cache/SearchLoadAndWriteProcessorTest.java  | 102 ++++++++++++++++++-
 3 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index fa02574..c12a652 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     return this.distAdvisor;
   }
 
-  public final CacheDistributionAdvisor getCacheDistributionAdvisor() {
+  public CacheDistributionAdvisor getCacheDistributionAdvisor() {
     return this.distAdvisor;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
index 3d969f9..2a10792 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
@@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
       Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue();
 
 
-  private InternalDistributedMember selectedNode;
+  private volatile InternalDistributedMember selectedNode;
   private boolean selectedNodeDead = false;
   private int timeout;
   private boolean netSearchDone = false;
@@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   private final Object membersLock = new Object();
 
+  private ArrayList<InternalDistributedMember> departedMembers;
+
   private Lock lock = null; // if non-null, then needs to be unlocked in release
 
   static final int NETSEARCH = 0;
@@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
     }
     synchronized (this) {
       if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) {
+        if (departedMembers == null) {
+          departedMembers = new ArrayList<InternalDistributedMember>();
+        }
+        departedMembers.add(id);
         selectedNode = null;
         selectedNodeDead = true;
         computeRemainingTimeout();
@@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         notifyAll(); // signal the waiter; we are not done; but we need the waiter to call
                      // sendNetSearchRequest
       }
-      if (responseQueue != null)
+      if (responseQueue != null) {
         responseQueue.remove(id);
+      }
       checkIfDone();
     }
   }
@@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   /************** Package Methods **********************/
 
+  InternalDistributedMember getSelectedNode() {
+    return this.selectedNode;
+  }
+
   /************** Private Methods **********************/
   /**
    * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region,
@@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         synchronized (this.pendingResponders) {
           this.pendingResponders.clear();
         }
-        this.requestInProgress = true;
-        this.remoteGetInProgress = true;
+
         synchronized (this) {
+          this.requestInProgress = true;
+          this.remoteGetInProgress = true;
           setSelectedNode(replicate);
           this.lastNotifySpot = 0;
-        }
-        sendValueRequest(replicate);
-        waitForObject2(this.remainingTimeout);
-        if (this.authorative) {
-          if (this.result != null) {
-            this.netSearch = true;
+
+          sendValueRequest(replicate);
+          waitForObject2(this.remainingTimeout);
+
+          if (this.authorative) {
+            if (this.result != null) {
+              this.netSearch = true;
+            }
+            return;
+          } else {
+            // clear anything that might have been set by our query.
+            this.selectedNode = null;
+            this.selectedNodeDead = false;
+            this.lastNotifySpot = 0;
+            this.result = null;
           }
-          return;
-        } else {
-          // clear anything that might have been set by our query.
-          this.selectedNode = null;
-          this.selectedNodeDead = false;
-          this.lastNotifySpot = 0;
-          this.result = null;
         }
       }
       synchronized (membersLock) {
@@ -1055,8 +1069,15 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   @SuppressWarnings("hiding")
   protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime,
-      boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag) {
+      boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag,
+      InternalDistributedMember responder) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (departedMembers != null && departedMembers.contains(responder)) {
+      if (isDebugEnabled) {
+        logger.debug("ignore the reply received from a departed member");
+      }
+      return;
+    }
 
     if (this.requestInProgress) {
       if (requestorTimedOut) {
@@ -1163,7 +1184,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
   private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException {
     if (this.requestInProgress) {
       try {
-        final DM dm = this.region.cache.getDistributedSystem().getDistributionManager();
+        final DM dm = this.region.getCache().getDistributedSystem().getDistributionManager();
         long waitTimeMs = timeoutMs;
         final long endTime = System.currentTimeMillis() + waitTimeMs;
         for (;;) {
@@ -2018,7 +2039,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         this.versionTag.replaceNullIDs(getSender());
       }
       processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized,
-          this.requestorTimedOut, this.authoritative, this.versionTag);
+          this.requestorTimedOut, this.authoritative, this.versionTag, getSender());
     }
 
     public int getDSFID() {

http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
index bfe78b0..91ac16b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -14,15 +14,29 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 
 @Category(UnitTest.class)
 public class SearchLoadAndWriteProcessorTest {
@@ -62,4 +76,90 @@ public class SearchLoadAndWriteProcessorTest {
     }
   }
 
+  InternalDistributedMember departedMember;
+
+  @Test
+  public void verifyNoProcessingReplyFromADepartedMember() {
+    SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+    DistributedRegion lr = mock(DistributedRegion.class);
+    RegionAttributes attrs = mock(RegionAttributes.class);
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    InternalDistributedSystem ds = mock(InternalDistributedSystem.class);
+    DM dm = mock(DM.class);
+    CacheDistributionAdvisor advisor = mock(CacheDistributionAdvisor.class);
+    CachePerfStats stats = mock(CachePerfStats.class);
+    ExpirationAttributes expirationAttrs = mock(ExpirationAttributes.class);
+    InternalDistributedMember m1 = mock(InternalDistributedMember.class);
+    InternalDistributedMember m2 = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> replicates = new HashSet<InternalDistributedMember>();;
+    replicates.add(m1);
+    replicates.add(m2);
+
+    when(lr.getAttributes()).thenReturn(attrs);
+    when(lr.getSystem()).thenReturn(ds);
+    when(lr.getCache()).thenReturn(cache);
+    when(lr.getCacheDistributionAdvisor()).thenReturn(advisor);
+    when(lr.getDistributionManager()).thenReturn(dm);
+    when(lr.getCachePerfStats()).thenReturn(stats);
+    when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    when(lr.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+    when(cache.getDistributedSystem()).thenReturn(ds);
+    when(cache.getSearchTimeout()).thenReturn(30);
+    when(attrs.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    when(attrs.getDataPolicy()).thenReturn(DataPolicy.EMPTY);
+    when(attrs.getEntryTimeToLive()).thenReturn(expirationAttrs);
+    when(attrs.getEntryIdleTimeout()).thenReturn(expirationAttrs);
+    when(advisor.adviseInitializedReplicates()).thenReturn(replicates);
+
+    Object key = "k1";
+    byte[] v1 = "v1".getBytes();
+    byte[] v2 = "v2".getBytes();
+    EntryEventImpl event = EntryEventImpl.create(lr, Operation.GET, key, null, null, false, null);
+
+
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> processor.getSelectedNode() != null);
+        departedMember = processor.getSelectedNode();
+        // Simulate member departed event
+        processor.memberDeparted(departedMember, true);
+      }
+    });
+    t1.start();
+
+    Thread t2 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> departedMember != null && processor.getSelectedNode() != null
+                && departedMember != processor.getSelectedNode());
+
+        // Handle search result from the departed member
+        processor.incomingNetSearchReply(v1, System.currentTimeMillis(), false, false, true,
+            mock(VersionTag.class), departedMember);
+      }
+    });
+    t2.start();
+
+    Thread t3 = new Thread(new Runnable() {
+      public void run() {
+        Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS)
+            .pollDelay(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+            .until(() -> departedMember != null && processor.getSelectedNode() != null
+                && departedMember != processor.getSelectedNode());
+        // Handle search result from a new member
+        processor.incomingNetSearchReply(v2, System.currentTimeMillis(), false, false, true,
+            mock(VersionTag.class), processor.getSelectedNode());
+      }
+    });
+    t3.start();
+
+    processor.initialize(lr, key, null);
+    processor.doSearchAndLoad(event, null, null);
+
+    assertTrue(Arrays.equals((byte[]) event.getNewValue(), v2));
+  }
+
 }


[02/12] geode git commit: GEODE-2732 after auto-reconnect a server is restarted on the default port

Posted by kl...@apache.org.
GEODE-2732 after auto-reconnect a server is restarted on the default port

Gfsh command line parameters were put into ThreadLocals to make them
available to the XML parser.  These are now held in non-thread-local
variables so that all threads, including the auto-reconnect thread,
can see them when building the cache.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/391502a2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/391502a2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/391502a2

Branch: refs/heads/feature/GEODE-2632
Commit: 391502a2615dbc80cde0b9a111fa967f4d76c39a
Parents: 39c72b2
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:11:04 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../geode/distributed/ServerLauncher.java       |  10 +-
 .../internal/cache/CacheServerLauncher.java     |  43 +++++---
 .../internal/cache/xmlcache/CacheCreation.java  |   2 +-
 .../cache30/ReconnectWithCacheXMLDUnitTest.java | 107 +++++++++++++++++++
 .../ReconnectWithUDPSecurityDUnitTest.java      |   6 ++
 .../ReconnectedCacheServerDUnitTest.java        |   4 +-
 .../cache30/ReconnectWithCacheXMLDUnitTest.xml  |  25 +++++
 7 files changed, 176 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9435bd8..c96732c 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -274,7 +274,7 @@ public class ServerLauncher extends AbstractLauncher<String> {
     this.assignBuckets = Boolean.TRUE.equals(builder.getAssignBuckets());
     setDebug(Boolean.TRUE.equals(builder.getDebug()));
     this.disableDefaultServer = Boolean.TRUE.equals(builder.getDisableDefaultServer());
-    CacheServerLauncher.disableDefaultServer.set(this.disableDefaultServer);
+    CacheServerLauncher.setDisableDefaultServer(this.disableDefaultServer);
     this.distributedSystemProperties = builder.getDistributedSystemProperties();
     this.force = Boolean.TRUE.equals(builder.getForce());
     this.help = Boolean.TRUE.equals(builder.getHelp());
@@ -286,11 +286,11 @@ public class ServerLauncher extends AbstractLauncher<String> {
     this.redirectOutput = Boolean.TRUE.equals(builder.getRedirectOutput());
     this.serverBindAddress = builder.getServerBindAddress();
     if (builder.isServerBindAddressSetByUser() && this.serverBindAddress != null) {
-      CacheServerLauncher.serverBindAddress.set(this.serverBindAddress.getHostAddress());
+      CacheServerLauncher.setServerBindAddress(this.serverBindAddress.getHostAddress());
     }
     this.serverPort = builder.getServerPort();
     if (builder.isServerPortSetByUser() && this.serverPort != null) {
-      CacheServerLauncher.serverPort.set(this.serverPort);
+      CacheServerLauncher.setServerPort(this.serverPort);
     }
     this.springXmlLocation = builder.getSpringXmlLocation();
     this.workingDirectory = builder.getWorkingDirectory();
@@ -954,8 +954,8 @@ public class ServerLauncher extends AbstractLauncher<String> {
       final String serverBindAddress =
           (getServerBindAddress() == null ? null : getServerBindAddress().getHostAddress());
       final Integer serverPort = getServerPort();
-      CacheServerLauncher.serverBindAddress.set(serverBindAddress);
-      CacheServerLauncher.serverPort.set(serverPort);
+      CacheServerLauncher.setServerBindAddress(serverBindAddress);
+      CacheServerLauncher.setServerPort(serverPort);
       final CacheServer cacheServer = cache.addCacheServer();
       cacheServer.setBindAddress(serverBindAddress);
       cacheServer.setPort(serverPort);

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
index 760abd3..9a544d2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
@@ -565,28 +565,43 @@ public class CacheServerLauncher {
     }
   }
 
-  public static ThreadLocal<Integer> serverPort = new ThreadLocal<Integer>();
+  private static Integer serverPort;
+
+  private static String serverBindAddress;
+
+  public static void setServerPort(Integer serverPort) {
+    CacheServerLauncher.serverPort = serverPort;
+  }
+
+  public static void setServerBindAddress(String serverBindAddress) {
+    CacheServerLauncher.serverBindAddress = serverBindAddress;
+  }
+
+  public static void setDisableDefaultServer(Boolean disableDefaultServer) {
+    CacheServerLauncher.disableDefaultServer = disableDefaultServer;
+  }
+
+  public static Boolean disableDefaultServer;
+
 
-  public static ThreadLocal<String> serverBindAddress = new ThreadLocal<String>();
 
   public static Integer getServerPort() {
-    return serverPort.get();
+    return serverPort;
   }
 
   public static String getServerBindAddress() {
-    return serverBindAddress.get();
+    return serverBindAddress;
   }
 
-  public static ThreadLocal<Boolean> disableDefaultServer = new ThreadLocal<Boolean>();
-
   public static Boolean getDisableDefaultServer() {
-    return disableDefaultServer.get();
+    return disableDefaultServer;
   }
 
+
   public static void clearStatics() {
-    disableDefaultServer.set(null);
-    serverPort.set(null);
-    serverBindAddress.set(null);
+    disableDefaultServer = null;
+    serverPort = null;
+    serverBindAddress = null;
   }
 
 
@@ -616,11 +631,11 @@ public class CacheServerLauncher {
     final String serverPortString = (String) options.get(SERVER_PORT);
 
     if (serverPortString != null) {
-      serverPort.set(Integer.parseInt(serverPortString));
+      serverPort = Integer.parseInt(serverPortString);
     }
 
-    serverBindAddress.set((String) options.get(SERVER_BIND_ADDRESS_NAME));
-    disableDefaultServer.set((Boolean) options.get(DISABLE_DEFAULT_SERVER));
+    serverBindAddress = (String) options.get(SERVER_BIND_ADDRESS_NAME);
+    disableDefaultServer = (Boolean) options.get(DISABLE_DEFAULT_SERVER);
     workingDir = new File(System.getProperty("user.dir"));
 
     // Say that we're starting...
@@ -835,7 +850,7 @@ public class CacheServerLauncher {
     // Create and start a default cache server
     // If (disableDefaultServer is not set or it is set but false) AND (the number of cacheservers
     // is 0)
-    Boolean disable = disableDefaultServer.get();
+    Boolean disable = disableDefaultServer;
     if ((disable == null || !disable) && cache.getCacheServers().size() == 0) {
       // Create and add a cache server
       CacheServer server = cache.addCacheServer();

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 1c3c933..a0810d9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -503,7 +503,7 @@ public class CacheCreation implements InternalCache {
 
     Integer serverPort = CacheServerLauncher.getServerPort();
     String serverBindAdd = CacheServerLauncher.getServerBindAddress();
-    Boolean disableDefaultServer = CacheServerLauncher.disableDefaultServer.get();
+    Boolean disableDefaultServer = CacheServerLauncher.getDisableDefaultServer();
     startCacheServers(this.getCacheServers(), cache, serverPort, serverBindAdd,
         disableDefaultServer);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
new file mode 100755
index 0000000..4f2fac1
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.membership.MembershipTestHook;
+import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerLauncher;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.util.test.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
+public class ReconnectWithCacheXMLDUnitTest extends JUnit4CacheTestCase {
+
+
+  public ReconnectWithCacheXMLDUnitTest() {
+    super();
+  }
+
+  private static final long serialVersionUID = 1L;
+
+
+  private String xmlProperty = DistributionConfig.GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile";
+  private String oldPropertySetting;
+
+  @Override
+  public final void postSetUp() {
+    oldPropertySetting = System.setProperty(xmlProperty, "true");
+  }
+
+  @Override
+  public final void preTearDownCacheTestCase() throws Exception {
+    if (oldPropertySetting == null) {
+      System.getProperties().remove(xmlProperty);
+    } else {
+      System.setProperty(xmlProperty, oldPropertySetting);
+    }
+  }
+
+  @Override
+  public Properties getDistributedSystemProperties() {
+    Properties result = super.getDistributedSystemProperties();
+    String fileName = TestUtil.getResourcePath(getClass(), "ReconnectWithCacheXMLDUnitTest.xml");
+    result.put(ConfigurationProperties.CACHE_XML_FILE, fileName);
+    result.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "true");
+    result.put(ConfigurationProperties.DISABLE_AUTO_RECONNECT, "false");
+    result.put(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "2000");
+    return result;
+  }
+
+  @Test
+  public void testCacheServerLauncherPortRetained() throws Exception {
+    CacheServerLauncher.setDisableDefaultServer(true);
+    CacheServerLauncher.setServerPort(AvailablePortHelper.getRandomAvailableTCPPort());
+    Cache cache = getCache();
+
+    final AtomicBoolean membershipFailed = new AtomicBoolean();
+    MembershipManagerHelper.addTestHook(cache.getDistributedSystem(), new MembershipTestHook() {
+      @Override
+      public void beforeMembershipFailure(String reason, Throwable cause) {
+        membershipFailed.set(true);
+      }
+
+      @Override
+      public void afterMembershipFailure(String reason, Throwable cause) {}
+    });
+    MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
+    assertTrue(membershipFailed.get());
+
+    await().atMost(60, TimeUnit.SECONDS).until(() -> cache.getReconnectedCache() != null);
+
+    Cache newCache = cache.getReconnectedCache();
+    CacheServer server = newCache.getCacheServers().iterator().next();
+    assertEquals(CacheServerLauncher.getServerPort().intValue(), server.getPort());
+    assertEquals(20, server.getMaxConnections()); // this setting is in the XML file
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
index a52d8bf..55d0a3c 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
@@ -17,8 +17,14 @@ package org.apache.geode.cache30;
 import java.util.Properties;
 
 import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.junit.experimental.categories.Category;
+
 import static org.apache.geode.distributed.ConfigurationProperties.*;
 
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
 public class ReconnectWithUDPSecurityDUnitTest extends ReconnectDUnitTest {
 
   public ReconnectWithUDPSecurityDUnitTest() {

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
index 3224257..2a2fe73 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache30;
 
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
@@ -32,7 +34,7 @@ import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipMan
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 
 
-@Category(DistributedTest.class)
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
 public class ReconnectedCacheServerDUnitTest extends JUnit4CacheTestCase {
 
   public ReconnectedCacheServerDUnitTest() {

http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
new file mode 100644
index 0000000..f7e338b
--- /dev/null
+++ b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cache
+    xmlns="http://geode.apache.org/schema/cache"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+    version="1.0">
+  <cache-server max-connections="20"/>
+</cache>
+


[12/12] geode git commit: WIP refactoring

Posted by kl...@apache.org.
WIP refactoring


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/7a83cccb
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/7a83cccb
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/7a83cccb

Branch: refs/heads/feature/GEODE-2632
Commit: 7a83cccb158f4464f7d439830769148dd9e4fc72
Parents: 0f4f294
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 5 10:24:23 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Fri Apr 7 12:49:42 2017 -0700

----------------------------------------------------------------------
 .../sockets/command/ClientCachePutBench.java    |  16 +-
 .../cache/tier/sockets/command/Put65Bench.java  |  94 +--
 .../geode/internal/cache/CacheServerImpl.java   |   2 +-
 .../geode/internal/cache/tier/Acceptor.java     |   2 +-
 .../cache/tier/sockets/AcceptorImpl.java        |  42 +-
 .../cache/tier/sockets/CacheClientNotifier.java | 583 +++-------------
 .../cache/tier/sockets/CacheClientProxy.java    | 698 ++++++++-----------
 .../cache/tier/sockets/ClientHealthMonitor.java |  15 +-
 .../tier/sockets/ClientUpdateMessageImpl.java   |   4 +-
 .../internal/cache/tier/sockets/HandShake.java  |  12 +-
 .../geode/internal/logging/LogService.java      |  10 +
 .../tier/sockets/AcceptorImplJUnitTest.java     |  22 +-
 .../cache/tier/sockets/AcceptorImplTest.java    |  96 +++
 .../tier/sockets/ClientConflationDUnitTest.java |   2 +-
 .../ClientServerForceInvalidateDUnitTest.java   |   4 +-
 .../tier/sockets/ClientServerMiscDUnitTest.java |   9 +-
 .../cache/tier/sockets/ConflationDUnitTest.java |   4 +-
 .../cache/tier/sockets/HAInterestTestCase.java  |  26 +-
 .../sockets/HAStartupAndFailoverDUnitTest.java  |   4 +-
 .../sockets/InterestListRecoveryDUnitTest.java  |   4 +-
 .../tier/sockets/RedundancyLevelTestBase.java   |  17 +-
 .../tier/sockets/command/Put65BenchTest.java    | 116 +++
 .../sockets/command/Put65RealBenchTest.java     | 141 ++++
 .../sockets/DurableClientSimpleDUnitTest.java   |  14 +-
 .../tier/sockets/DurableClientTestCase.java     |   6 +-
 .../cache/wan/Simple2CacheServerDUnitTest.java  |   6 +-
 26 files changed, 945 insertions(+), 1004 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
index a1cbd81..df51b78 100644
--- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java
@@ -14,9 +14,12 @@
  */
 package org.apache.geode.internal.cache.tier.sockets.command;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.io.FileUtils.*;
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
 import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -24,6 +27,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.distributed.AbstractLauncher.Status;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.ServerLauncher;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -121,16 +125,22 @@ public class ClientCachePutBench {
       command.add(ServerLauncher.Command.START.getName());
       command.add("server1");
       command.add("--server-port=" + this.serverPort);
-      // command.add("--redirect-output");
+      // put65Command.add("--redirect-output");
 
       this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start();
 
-      boolean forever = true;
-      while (forever) {
+      boolean sleep = false;
+      while (sleep) {
         assertThat(this.process.isAlive()).isTrue();
         Thread.sleep(10000);
       }
 
+      ServerLauncher serverLauncher = new ServerLauncher.Builder()
+          .setWorkingDirectory(this.temporaryFolder.getRoot().getAbsolutePath()).build();
+
+      await().atMost(2, MINUTES)
+          .until(() -> assertThat(serverLauncher.status().getStatus()).isEqualTo(ONLINE));
+
       this.clientCache =
           new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create();
       this.region =

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
----------------------------------------------------------------------
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
index 6ccd8c3..d393769 100644
--- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java
@@ -16,10 +16,8 @@ package org.apache.geode.internal.cache.tier.sockets.command;
 
 import static org.apache.geode.SystemFailure.loadEmergencyClasses;
 import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
-import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -30,8 +28,6 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
@@ -42,78 +38,82 @@ public class Put65Bench {
 
   @State(Scope.Benchmark)
   public static class ServerConnectionState {
-    public Command command;
+    public Command put65Command;
     public ServerConnection mockServerConnection;
-    public Message message;
+    public Message mockMessage;
 
     @Setup(Level.Trial)
     public void setup() throws Exception {
       loadEmergencyClasses();
 
-      this.command = Put65.getCommand();
+      this.put65Command = Put65.getCommand();
 
-      this.mockServerConnection = mock(ServerConnection.class);
+      this.mockServerConnection = mock(ServerConnection.class,
+          withSettings().defaultAnswer(CALLS_REAL_METHODS).name("mockServerConnection"));
       when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
 
-      TXManagerImpl txManager = mock(TXManagerImpl.class);
-      GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
-      when(cache.getTxManager()).thenReturn(txManager);
+      GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache"));
+      when(this.mockServerConnection.getCache()).thenReturn(mockCache);
 
-      when(this.mockServerConnection.getCache()).thenReturn(cache);
+      TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager"));
+      when(mockCache.getTxManager()).thenReturn(mockTxManager);
 
-      CacheServerStats cacheServerStats = mock(CacheServerStats.class);
-      when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats);
+      CacheServerStats mockCacheServerStats =
+          mock(CacheServerStats.class, withSettings().name("mockCacheServerStats"));
+      when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats);
 
-      // .getDistributedMember()
-      ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class);
+      ClientProxyMembershipID mockProxyId =
+          mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
       when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
 
-      Message errorResponseMessage = mock(Message.class);
-      when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage);
+      Message mockErrorResponseMessage =
+          mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+      when(this.mockServerConnection.getErrorResponseMessage())
+          .thenReturn(mockErrorResponseMessage);
 
-      Part regionNamePart = mock(Part.class);
-      when(regionNamePart.getString()).thenReturn("regionNamePart");
+      Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+      when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
 
-      Part operationPart = mock(Part.class);
-      when(operationPart.getObject()).thenReturn(Operation.UPDATE);
+      Part mockOperationPart = mock(Part.class);
+      when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
 
-      Part flagsPart = mock(Part.class);
-      when(flagsPart.getInt()).thenReturn(0);
+      Part mockFlagsPart = mock(Part.class);
+      when(mockFlagsPart.getInt()).thenReturn(0);
 
-      Part keyPart = mock(Part.class);
-      when(keyPart.getObject()).thenReturn("keyPart");
-      when(keyPart.getStringOrObject()).thenReturn("keyPart");
+      Part mockKeyPart = mock(Part.class);
+      when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+      when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
 
-      Part isDeltaPart = mock(Part.class);
-      when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+      Part mockIsDeltaPart = mock(Part.class);
+      when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
 
-      Part valuePart = mock(Part.class);
-      when(valuePart.getObject()).thenReturn("valuePart");
+      Part mockValuePart = mock(Part.class);
+      when(mockValuePart.getObject()).thenReturn("mockValuePart");
 
-      Part eventPart = mock(Part.class);
-      when(eventPart.getObject()).thenReturn("eventPart");
+      Part mockEventPart = mock(Part.class);
+      when(mockEventPart.getObject()).thenReturn("mockEventPart");
 
-      Part callbackArgPart = mock(Part.class);
-      when(callbackArgPart.getObject()).thenReturn("callbackArgPart");
+      Part mockCallbackArgPart = mock(Part.class);
+      when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
 
-      message = mock(Message.class);
+      mockMessage = mock(Message.class);
 
-      when(message.getTransactionId()).thenReturn(NOTX);
+      when(mockMessage.getTransactionId()).thenReturn(NOTX);
 
-      when(message.getPart(0)).thenReturn(regionNamePart);
-      when(message.getPart(1)).thenReturn(operationPart);
-      when(message.getPart(2)).thenReturn(flagsPart);
-      when(message.getPart(3)).thenReturn(keyPart);
-      when(message.getPart(4)).thenReturn(isDeltaPart);
-      when(message.getPart(5)).thenReturn(valuePart);
-      when(message.getPart(6)).thenReturn(eventPart);
-      when(message.getPart(7)).thenReturn(callbackArgPart);
+      when(mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+      when(mockMessage.getPart(1)).thenReturn(mockOperationPart);
+      when(mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+      when(mockMessage.getPart(3)).thenReturn(mockKeyPart);
+      when(mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+      when(mockMessage.getPart(5)).thenReturn(mockValuePart);
+      when(mockMessage.getPart(6)).thenReturn(mockEventPart);
+      when(mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
     }
   }
 
   // @Benchmark
   public void benchmark(ServerConnectionState state, Blackhole blackhole) {
-    state.command.execute(state.message, state.mockServerConnection);
+    state.put65Command.execute(state.mockMessage, state.mockServerConnection);
     // Message replyMessage = state.mockServerConnection.getReplyMessage();
     // blackhole.consume(replyMessage);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index a3c4a93..2294fb8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -317,7 +317,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
         getSocketBufferSize(), getMaximumTimeBetweenPings(), this.cache, getMaxConnections(),
         getMaxThreads(), getMaximumMessageCount(), getMessageTimeToLive(), this.loadMonitor,
         overflowAttributesList, this.isGatewayReceiver, this.gatewayTransportFilters,
-        this.tcpNoDelay);
+        this.tcpNoDelay, this.cache.getCancelCriterion());
 
     this.acceptor.start();
     this.advisor.handshake();

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..97dcba5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -25,7 +25,7 @@ import org.apache.geode.internal.Version;
  *
  * @since GemFire 2.0.2
  */
-public abstract class Acceptor {
+public interface Acceptor {
 
   // The following are communications "mode" bytes sent as the first byte of a
   // client/server handshake. They must not be larger than 1 byte

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ed29472..d8c64f4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -57,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLException;
 
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -97,7 +100,7 @@ import org.apache.geode.internal.util.ArrayUtils;
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable {
+public class AcceptorImpl implements Acceptor, Runnable {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
@@ -283,7 +286,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
-   * 
+   *
+   * @param cancelCriterion
    * @see SocketCreator#createServerSocket(int, int, InetAddress)
    * @see ClientHealthMonitor
    * @since GemFire 5.7
@@ -292,12 +296,18 @@ public class AcceptorImpl extends Acceptor implements Runnable {
       int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
       int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
       ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
-      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay) throws IOException {
+      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+      final CancelCriterion cancelCriterion) throws IOException {
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
     this.notifyBySubscription = notifyBySubscription;
     this.isGatewayReceiver = isGatewayReceiver;
     this.gatewayTransportFilters = transportFilter;
+
+    this.socketBufferSize = socketBufferSize;
+    this.cache = internalCache;
+    this.crHelper = new CachedRegionHelper(this.cache);
+
     {
       int tmp_maxConnections = maxConnections;
       if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) {
@@ -375,12 +385,6 @@ public class AcceptorImpl extends Acceptor implements Runnable {
             .getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
       }
 
-      final GemFireCacheImpl gc;
-      if (getCachedRegionHelper() != null) {
-        gc = (GemFireCacheImpl) getCachedRegionHelper().getCache();
-      } else {
-        gc = null;
-      }
       final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG).intValue();
       final long tilt = System.currentTimeMillis() + 120 * 1000;
 
@@ -422,9 +426,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
               Thread.currentThread().interrupt();
             }
           }
-          if (gc != null) {
-            gc.getCancelCriterion().checkCancelInProgress(null);
-          }
+          cancelCriterion.checkCancelInProgress(null);
         } // for
       } // isSelector
       else { // !isSelector
@@ -452,9 +454,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
               Thread.currentThread().interrupt();
             }
           }
-          if (gc != null) {
-            gc.getCancelCriterion().checkCancelInProgress(null);
-          }
+          cancelCriterion.checkCancelInProgress(null);
         } // for
       } // !isSelector
 
@@ -485,15 +485,15 @@ public class AcceptorImpl extends Acceptor implements Runnable {
 
     }
 
-    this.cache = internalCache;
-    this.crHelper = new CachedRegionHelper(this.cache);
+    final StatisticsFactory statsFactory =
+        isGatewayReceiver ? new DummyStatisticsFactory() : this.cache.getDistributedSystem();
 
-    this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
-        messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
-    this.socketBufferSize = socketBufferSize;
+    this.clientNotifier =
+        CacheClientNotifier.getInstance(this.cache, this.stats, statsFactory, maximumMessageCount,
+            messageTimeToLive, this.connectionListener, overflowAttributesList, isGatewayReceiver);
 
     // Create the singleton ClientHealthMonitor
-    this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+    this.healthMonitor = ClientHealthMonitor.getInstance(this.cache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
     {

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 28d6ae2..813d569 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.internal.security.SecurityService;
 import org.apache.logging.log4j.Logger;
 import org.apache.shiro.subject.Subject;
 
@@ -79,7 +80,6 @@ import org.apache.geode.distributed.internal.MessageWithReply;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.ClassLoadUtil;
-import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.InternalInstantiator;
 import org.apache.geode.internal.net.SocketCloser;
@@ -113,10 +113,8 @@ import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.InternalLogWriter;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.security.AccessControl;
 import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
@@ -125,32 +123,25 @@ import org.apache.geode.security.AuthenticationRequiredException;
  * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
  * to clients requesting notification of updates and notifies them when updates occur.
  *
- *
  * @since GemFire 3.2
  */
 @SuppressWarnings({"synthetic-access", "deprecation"})
 public class CacheClientNotifier {
   private static final Logger logger = LogService.getLogger();
+  private static final Logger securityLogger = LogService.getSecurityLogger();
 
   private static volatile CacheClientNotifier ccnSingleton;
 
   /**
    * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
-   *
-   * @param cache The GemFire <code>Cache</code>
-   * @param acceptorStats
-   * @param maximumMessageCount
-   * @param messageTimeToLive
-   * @param listener
-   * @param overflowAttributesList
-   * @return A <code>CacheClientNotifier</code> instance
    */
-  public static synchronized CacheClientNotifier getInstance(Cache cache,
-      CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
-      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
+  public static synchronized CacheClientNotifier getInstance(final Cache cache,
+      final CacheServerStats acceptorStats, final StatisticsFactory statsFactory,
+      final int maximumMessageCount, final int messageTimeToLive, final ConnectionListener listener,
+      final List overflowAttributesList, final boolean isGatewayReceiver) {
     if (ccnSingleton == null) {
-      ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
-          messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
+      ccnSingleton = new CacheClientNotifier(cache, acceptorStats, statsFactory,
+          maximumMessageCount, messageTimeToLive, listener);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,13 +149,6 @@ public class CacheClientNotifier {
       // In this case, the HaContainer should be lazily created here
       ccnSingleton.initHaContainer(overflowAttributesList);
     }
-    // else {
-    // ccnSingleton.acceptorStats = acceptorStats;
-    // ccnSingleton.maximumMessageCount = maximumMessageCount;
-    // ccnSingleton.messageTimeToLive = messageTimeToLive;
-    // ccnSingleton._connectionListener = listener;
-    // ccnSingleton.setCache((GemFireCache)cache);
-    // }
     return ccnSingleton;
   }
 
@@ -173,6 +157,51 @@ public class CacheClientNotifier {
   }
 
   /**
+   * Constructor.
+   * 
+   * @param cache The GemFire <code>Cache</code>
+   * @param acceptorStats
+   * @param statsFactory
+   * @param maximumMessageCount
+   * @param messageTimeToLive
+   * @param listener a listener which should receive notifications abouts queues being added or
+   *        removed.
+   */
+  private CacheClientNotifier(final Cache cache, final CacheServerStats acceptorStats,
+      final StatisticsFactory statsFactory, final int maximumMessageCount,
+      final int messageTimeToLive, final ConnectionListener listener) {
+    // Set the Cache
+    this.setCache((GemFireCacheImpl) cache);
+    this.acceptorStats = acceptorStats;
+    // we only need one thread per client and wait 50ms for close
+    this.socketCloser = new SocketCloser(1, 50);
+    this._connectionListener = listener;
+
+    this.maximumMessageCount = maximumMessageCount;
+    this.messageTimeToLive = messageTimeToLive;
+
+    this._statistics = new CacheClientNotifierStats(statsFactory);
+
+    try {
+      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+      if (this.logFrequency <= 0) {
+        this.logFrequency = DEFAULT_LOG_FREQUENCY;
+      }
+    } catch (Exception e) {
+      this.logFrequency = DEFAULT_LOG_FREQUENCY;
+    }
+
+    eventEnqueueWaitTime =
+        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+    if (eventEnqueueWaitTime < 0) {
+      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+    }
+
+    // Schedule task to periodically ping clients.
+    scheduleClientPingTask();
+  }
+
+  /**
    * Writes a given message to the output stream
    *
    * @param dos the <code>DataOutputStream</code> to use for writing the message
@@ -257,32 +286,12 @@ public class CacheClientNotifier {
     writeMessage(dos, type, ex.toString(), clientVersion);
   }
 
-  // /**
-  // * Factory method to return the singleton <code>CacheClientNotifier</code>
-  // * instance.
-  // * @return the singleton <code>CacheClientNotifier</code> instance
-  // */
-  // public static CacheClientNotifier getInstance()
-  // {
-  // return _instance;
-  // }
-
-  // /**
-  // * Shuts down the singleton <code>CacheClientNotifier</code> instance.
-  // */
-  // public static void shutdownInstance()
-  // {
-  // if (_instance == null) return;
-  // _instance.shutdown();
-  // _instance = null;
-  // }
-
   /**
    * Registers a new client updater that wants to receive updates with this server.
    *
    * @param socket The socket over which the server communicates with the client.
    */
-  public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
+  void registerClient(Socket socket, boolean isPrimary, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Since no remote ports were specified in the message, wait for them.
     long startTime = this._statistics.startTime();
@@ -329,7 +338,7 @@ public class CacheClientNotifier {
     }
   }
 
-  protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+  private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
       boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Read the ports and throw them away. We no longer need them
@@ -382,26 +391,27 @@ public class CacheClientNotifier {
       // TODO:hitesh
       Properties credentials = HandShake.readCredentials(dis, dos, system);
       if (credentials != null && proxy != null) {
-        if (securityLogWriter.fineEnabled()) {
-          securityLogWriter
-              .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
+        if (securityLogger.isDebugEnabled()) {
+          securityLogger.debug("CacheClientNotifier: verifying credentials for proxyID: {}",
+              proxyID);
         }
-        Object subject = HandShake.verifyCredentials(authenticator, credentials,
-            system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
+        Object subject =
+            HandShake.verifyCredentials(authenticator, credentials, system.getSecurityProperties(),
+                system.getLogWriter(), system.getSecurityLogWriter(), member);
         if (subject instanceof Principal) {
           Principal principal = (Principal) subject;
-          if (securityLogWriter.fineEnabled()) {
-            securityLogWriter
-                .fine("CacheClientNotifier: successfully verified credentials for proxyID: "
-                    + proxyID + " having principal: " + principal.getName());
+          if (securityLogger.isDebugEnabled()) {
+            securityLogger.debug(
+                "CacheClientNotifier: successfully verified credentials for proxyID: {} having principal: {}",
+                proxyID, principal.getName());
           }
 
           String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
           if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
             if (principal == null) {
-              securityLogWriter.warning(
+              securityLogger.warn(LocalizedMessage.create(
                   LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
-                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
+                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID}));
             }
             Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
             authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
@@ -417,15 +427,15 @@ public class CacheClientNotifier {
           LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
               .toLocalizedString(e));
     } catch (AuthenticationRequiredException ex) {
-      securityLogWriter.warning(
+      securityLogger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
-          new Object[] {proxyID, ex});
+          new Object[] {proxyID, ex}));
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
       return;
     } catch (AuthenticationFailedException ex) {
-      securityLogWriter.warning(
+      securityLogger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
-          new Object[] {proxyID, ex});
+          new Object[] {proxyID, ex}));
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
       return;
     } catch (CacheException e) {
@@ -445,11 +455,9 @@ public class CacheClientNotifier {
       return;
     }
 
-
     this._statistics.endClientRegistration(startTime);
   }
 
-
   /**
    * Registers a new client that wants to receive updates with this server.
    *
@@ -504,8 +512,9 @@ public class CacheClientNotifier {
               "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
               proxyId.getDurableId());
         }
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
-            clientVersion, acceptorId, notifyBySubscription);
+        l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache,
+            this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket,
+            proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
         successful = this.initializeProxy(l_proxy);
       } else {
         if (proxy.isPrimary()) {
@@ -516,8 +525,8 @@ public class CacheClientNotifier {
         qSize = proxy.getQueueSize();
         // A proxy exists for this durable client. It must be reinitialized.
         if (l_proxy.isPaused()) {
-          if (CacheClientProxy.testHook != null) {
-            CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
+          if (CacheClientProxy.getTestHook() != null) {
+            CacheClientProxy.getTestHook().doTestHook("CLIENT_PRE_RECONNECT");
           }
           if (l_proxy.lockDrain()) {
             try {
@@ -531,8 +540,8 @@ public class CacheClientNotifier {
               l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
                   clientVersion);
               l_proxy.setMarkerEnqueued(true);
-              if (CacheClientProxy.testHook != null) {
-                CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
+              if (CacheClientProxy.getTestHook() != null) {
+                CacheClientProxy.getTestHook().doTestHook("CLIENT_RECONNECTED");
               }
             } finally {
               l_proxy.unlockDrain();
@@ -543,8 +552,8 @@ public class CacheClientNotifier {
                     .toLocalizedString();
             logger.warn(unsuccessfulMsg);
             responseByte = HandShake.REPLY_REFUSED;
-            if (CacheClientProxy.testHook != null) {
-              CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
+            if (CacheClientProxy.getTestHook() != null) {
+              CacheClientProxy.getTestHook().doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
             }
           }
         } else {
@@ -582,8 +591,9 @@ public class CacheClientNotifier {
 
       if (toCreateNewProxy) {
         // Create the new proxy for this non-durable client
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
-            clientVersion, acceptorId, notifyBySubscription);
+        l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache,
+            this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket,
+            proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
         successful = this.initializeProxy(l_proxy);
       }
     }
@@ -754,10 +764,8 @@ public class CacheClientNotifier {
    * Unregisters an existing client from this server.
    *
    * @param memberId Uniquely identifies the client
-   *
-   *
    */
-  public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+  void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
     if (logger.isDebugEnabled()) {
       logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
     }
@@ -781,8 +789,6 @@ public class CacheClientNotifier {
 
   /**
    * The client represented by the proxyId is ready to receive updates.
-   *
-   * @param proxyId
    */
   public void readyForEvents(ClientProxyMembershipID proxyId) {
     CacheClientProxy proxy = getClientProxy(proxyId);
@@ -817,7 +823,6 @@ public class CacheClientNotifier {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonNotifyClients(event, null);
-
     }
   }
 
@@ -829,7 +834,6 @@ public class CacheClientNotifier {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonNotifyClients(event, cmsg);
-
     }
   }
 
@@ -839,10 +843,6 @@ public class CacheClientNotifier {
 
     FilterInfo filterInfo = event.getLocalFilterInfo();
 
-    // if (_logger.fineEnabled()) {
-    // _logger.fine("Client dispatcher processing event " + event);
-    // }
-
     FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
     if (filterInfo != null) {
       // if the routing was made using an old profile we need to recompute it
@@ -964,10 +964,8 @@ public class CacheClientNotifier {
     if (filterInfo.filterProcessedLocally) {
       removeDestroyTokensFromCqResultKeys(event, filterInfo);
     }
-
   }
 
-
   private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event,
       FilterInfo filterInfo) {
     FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
@@ -986,38 +984,22 @@ public class CacheClientNotifier {
     }
   }
 
-
   /**
    * delivers the given message to all proxies for routing. The message should already have client
    * interest established, or override the isClientInterested method to implement its own routing
-   * 
-   * @param clientMessage
    */
   public static void routeClientMessage(Conflatable clientMessage) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
-                                                                                             // to
-                                                                                             // use
-                                                                                             // keySet
-                                                                                             // here
-                                                                                             // because
-                                                                                             // all
-                                                                                             // we
-                                                                                             // do
-                                                                                             // is
-                                                                                             // call
-                                                                                             // getClientProxy
-                                                                                             // with
-                                                                                             // these
-                                                                                             // keys
+      // ok to use keySet here because all we do is call getClientProxy with these keys
+      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet());
     }
   }
 
   /*
    * this is for server side registration of client queue
    */
-  public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+  static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
       ClientProxyMembershipID clientProxyMembershipId) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
@@ -1029,8 +1011,8 @@ public class CacheClientNotifier {
   private void singletonRouteClientMessage(Conflatable conflatable,
       Collection<ClientProxyMembershipID> filterClients) {
 
-    this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
-                                                                  // but no p2p distribution
+    // bug #43942 - client notified but no p2p distribution
+    this._cache.getCancelCriterion().checkCancelInProgress(null);
 
     List<CacheClientProxy> deadProxies = null;
     for (ClientProxyMembershipID clientId : filterClients) {
@@ -1061,7 +1043,8 @@ public class CacheClientNotifier {
    * processes the given collection of durable and non-durable client identifiers, returning a
    * collection of non-durable identifiers of clients connected to this VM
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+    // TODO: false is ignored here because true is hardcoded in other method
     return getProxyIDs(mixedDurableAndNonDurableIDs, false);
   }
 
@@ -1070,7 +1053,7 @@ public class CacheClientNotifier {
    * collection of non-durable identifiers of clients connected to this VM. This version can check
    * for proxies in initialization as well as fully initialized proxies.
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
       boolean proxyInInitMode) {
     Set<ClientProxyMembershipID> result = new HashSet();
     for (Object id : mixedDurableAndNonDurableIDs) {
@@ -1209,7 +1192,7 @@ public class CacheClientNotifier {
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
    * @return whether the <code>CacheClientNotifier</code> supports the input operation
    */
-  protected boolean supportsOperation(EnumListenerEvent operation) {
+  private boolean supportsOperation(EnumListenerEvent operation) {
     return operation == EnumListenerEvent.AFTER_CREATE
         || operation == EnumListenerEvent.AFTER_UPDATE
         || operation == EnumListenerEvent.AFTER_DESTROY
@@ -1219,87 +1202,6 @@ public class CacheClientNotifier {
         || operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
   }
 
-  // /**
-  // * Queues the <code>ClientUpdateMessage</code> to be distributed
-  // * to interested clients. This method is not being used currently.
-  // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued
-  // */
-  // protected void notifyClients(final ClientUpdateMessage clientMessage)
-  // {
-  // if (USE_SYNCHRONOUS_NOTIFICATION)
-  // {
-  // // Execute the method in the same thread as the caller
-  // deliver(clientMessage);
-  // }
-  // else {
-  // // Obtain an Executor and use it to execute the method in its own thread
-  // try
-  // {
-  // getExecutor().execute(new Runnable()
-  // {
-  // public void run()
-  // {
-  // deliver(clientMessage);
-  // }
-  // }
-  // );
-  // } catch (InterruptedException e)
-  // {
-  // _logger.warning("CacheClientNotifier: notifyClients interrupted", e);
-  // Thread.currentThread().interrupt();
-  // }
-  // }
-  // }
-
-  // /**
-  // * Updates the information this <code>CacheClientNotifier</code> maintains
-  // * for a given edge client. It is invoked when a edge client re-connects to
-  // * the server.
-  // *
-  // * @param clientHost
-  // * The host on which the client runs (i.e. the host the
-  // * CacheClientNotifier uses to communicate with the
-  // * CacheClientUpdater) This is used with the clientPort to uniquely
-  // * identify the client
-  // * @param clientPort
-  // * The port through which the server communicates with the client
-  // * (i.e. the port the CacheClientNotifier uses to communicate with
-  // * the CacheClientUpdater) This is used with the clientHost to
-  // * uniquely identify the client
-  // * @param remotePort
-  // * The port through which the client communicates with the server
-  // * (i.e. the new port the ConnectionImpl uses to communicate with the
-  // * ServerConnection)
-  // * @param membershipID
-  // * Uniquely idenifies the client
-  // */
-  // public void registerClientPort(String clientHost, int clientPort,
-  // int remotePort, ClientProxyMembershipID membershipID)
-  // {
-  // if (_logger.fineEnabled())
-  // _logger.fine("CacheClientNotifier: Registering client port: "
-  // + clientHost + ":" + clientPort + " with remote port " + remotePort
-  // + " and ID " + membershipID);
-  // for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-  // CacheClientProxy proxy = (CacheClientProxy)i.next();
-  // if (_logger.finerEnabled())
-  // _logger.finer("CacheClientNotifier: Potential client: " + proxy);
-  // //if (proxy.representsCacheClientUpdater(clientHost, clientPort))
-  // if (proxy.isMember(membershipID)) {
-  // if (_logger.finerEnabled())
-  // _logger
-  // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match");
-  // proxy.addPort(remotePort);
-  // }
-  // else {
-  // if (_logger.finerEnabled())
-  // _logger.finer("CacheClientNotifier: Host and port "
-  // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort()
-  // + " do not match " + clientHost + ":" + clientPort);
-  // }
-  // }
-  // }
-
   /**
    * Registers client interest in the input region and key.
    *
@@ -1350,18 +1252,6 @@ public class CacheClientNotifier {
     }
   }
 
-  /*
-   * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID
-   * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion
-   * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new
-   * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 //
-   * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " +
-   * membershipID + " :Throwing RegionDestroyedException as region: " + regionName +
-   * " is not present."); } throw new RegionDestroyedException("registerInterest failed",
-   * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID);
-   * } }
-   */
-
   /**
    * Store region and delta relation
    * 
@@ -1457,7 +1347,6 @@ public class CacheClientNotifier {
     }
   }
 
-
   /**
    * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
    * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
@@ -1484,9 +1373,6 @@ public class CacheClientNotifier {
             }
           }
         }
-        // else {
-        // This is a replay-of-event case.
-        // }
       } else {
         // This wrapper resides in haContainer.
         wrapper.setClientUpdateMessage(null);
@@ -1541,7 +1427,7 @@ public class CacheClientNotifier {
    * 
    * @return the <code>CacheClientProxy</code> associated to the durableClientId
    */
-  public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
+  private CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final boolean isTraceEnabled = logger.isTraceEnabled();
 
@@ -1584,46 +1470,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
-   * 
-   * @return the <code>CacheClientProxy</code> associated to the same distributed system
-   */
-  public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (isDebugEnabled) {
-      logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
-          membershipID);
-      logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
-          this, getClientProxies().size());
-      /*
-       * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " +
-       * getClientProxies());
-       */
-    }
-    CacheClientProxy proxy = null;
-    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled) {
-        logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
-      }
-      if (clientProxy.isSameDSMember(membershipID)) {
-        proxy = clientProxy;
-        if (isDebugEnabled) {
-          logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
-              membershipID);
-        }
-        break;
-      }
-    }
-    return proxy;
-  }
-
-
-  /**
    * It will remove the clients connected to the passed acceptorId. If its the only server, shuts
    * down this instance.
    */
-  protected synchronized void shutdown(long acceptorId) {
+  synchronized void shutdown(long acceptorId) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
@@ -1685,14 +1535,14 @@ public class CacheClientNotifier {
    *
    * @param proxy The <code>CacheClientProxy</code> to add
    */
-  protected void addClientProxy(CacheClientProxy proxy) throws IOException {
+  private void addClientProxy(CacheClientProxy proxy) throws IOException {
     // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
     getCache(); // ensure cache reference is up to date so firstclient state is correct
     this._clientProxies.put(proxy.getProxyID(), proxy);
     // Remove this proxy from the init proxy list.
     removeClientInitProxy(proxy);
     this._connectionListener.queueAdded(proxy.getProxyID());
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    if (!(proxy.isClientConflationOn())) {
       // Delta not supported with conflation ON
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       /*
@@ -1704,22 +1554,20 @@ public class CacheClientNotifier {
       }
     }
     this.timedOutDurableClientProxies.remove(proxy.getProxyID());
-
   }
 
-  protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
+  private void addClientInitProxy(CacheClientProxy proxy) throws IOException {
     this._initClientProxies.put(proxy.getProxyID(), proxy);
   }
 
-  protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
+  private void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
     this._initClientProxies.remove(proxy.getProxyID());
   }
 
-  protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
+  private boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
     return this._initClientProxies.containsKey(proxy.getProxyID());
   }
 
-
   /**
    * Returns (possibly stale) set of memberIds for all clients being actively notified by this
    * server.
@@ -1781,7 +1629,6 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasPrimaryForDurableClient(String durableId) {
-
     for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
       CacheClientProxy proxy = (CacheClientProxy) iter.next();
       ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1818,7 +1665,9 @@ public class CacheClientNotifier {
     return ccp.getQueueSizeStat();
   }
 
-  // closes the cq and drains the queue
+  /**
+   * closes the cq and drains the queue
+   */
   public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
     CacheClientProxy proxy = getClientProxy(durableClientId);
     // close and drain
@@ -1828,33 +1677,29 @@ public class CacheClientNotifier {
     return false;
   }
 
-
   /**
    * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
    *
    * @param proxy The <code>CacheClientProxy</code> to remove
    */
-  protected void removeClientProxy(CacheClientProxy proxy) {
-    // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
-    // Exception("stack trace"));
+  void removeClientProxy(CacheClientProxy proxy) {
     ClientProxyMembershipID client = proxy.getProxyID();
     this._clientProxies.remove(client);
     this._connectionListener.queueRemoved();
     ((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client);
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    if (!(proxy.isClientConflationOn())) {
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       if (chm != null) {
         chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
       }
     }
-
   }
 
   void durableClientTimedOut(ClientProxyMembershipID client) {
     this.timedOutDurableClientProxies.add(client);
   }
 
-  public boolean isTimedOut(ClientProxyMembershipID client) {
+  private boolean isTimedOut(ClientProxyMembershipID client) {
     return this.timedOutDurableClientProxies.contains(client);
   }
 
@@ -1868,17 +1713,6 @@ public class CacheClientNotifier {
     return Collections.unmodifiableCollection(this._clientProxies.values());
   }
 
-  // /**
-  // * Returns the <code>Executor</code> that delivers messages to the
-  // * <code>CacheClientProxy</code> instances.
-  // * @return the <code>Executor</code> that delivers messages to the
-  // * <code>CacheClientProxy</code> instances
-  // */
-  // protected Executor getExecutor()
-  // {
-  // return _executor;
-  // }
-
   private void closeAllClientCqs(CacheClientProxy proxy) {
     CqService cqService = proxy.getCache().getCqService();
     if (cqService != null) {
@@ -1901,7 +1735,6 @@ public class CacheClientNotifier {
 
   /**
    * Shuts down durable client proxy
-   *
    */
   public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
     CacheClientProxy ccp = getClientProxy(durableClientId);
@@ -1930,8 +1763,9 @@ public class CacheClientNotifier {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     for (Iterator i = deadProxies.iterator(); i.hasNext();) {
       CacheClientProxy proxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled)
+      if (isDebugEnabled) {
         logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+      }
 
       // Close the proxy
       boolean keepProxy = false;
@@ -1939,7 +1773,7 @@ public class CacheClientNotifier {
         keepProxy = proxy.close(false, stoppedNormally);
       } catch (CancelException e) {
         throw e;
-      } catch (Exception e) {
+      } catch (Exception e) { // TODO: at least log at debug level
       }
 
       // Remove the proxy if necessary. It might not be necessary to remove the
@@ -1960,7 +1794,6 @@ public class CacheClientNotifier {
     } // for
   }
 
-
   /**
    * Registers a new <code>InterestRegistrationListener</code> with the set of
    * <code>InterestRegistrationListener</code>s.
@@ -1999,18 +1832,16 @@ public class CacheClientNotifier {
   }
 
   /**
-   * 
    * @since GemFire 5.8Beta
    */
-  protected boolean containsInterestRegistrationListeners() {
+  boolean containsInterestRegistrationListeners() {
     return !this.writableInterestRegistrationListeners.isEmpty();
   }
 
   /**
-   * 
    * @since GemFire 5.8Beta
    */
-  protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+  void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
     for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
       InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
       if (event.isRegister()) {
@@ -2040,8 +1871,6 @@ public class CacheClientNotifier {
       GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
         this._cache = cache;
-        this.logWriter = cache.getInternalLogWriter();
-        this.securityLogWriter = cache.getSecurityInternalLogWriter();
       }
     }
     return this._cache;
@@ -2072,68 +1901,6 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Constructor.
-   *
-   * @param cache The GemFire <code>Cache</code>
-   * @param acceptorStats
-   * @param maximumMessageCount
-   * @param messageTimeToLive
-   * @param listener a listener which should receive notifications abouts queues being added or
-   *        removed.
-   * @param overflowAttributesList
-   */
-  private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount,
-      int messageTimeToLive, ConnectionListener listener, List overflowAttributesList,
-      boolean isGatewayReceiver) {
-    // Set the Cache
-    this.setCache((GemFireCacheImpl) cache);
-    this.acceptorStats = acceptorStats;
-    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
-                                                 // for close
-
-    // Set the LogWriter
-    this.logWriter = (InternalLogWriter) cache.getLogger();
-
-    this._connectionListener = listener;
-
-    // Set the security LogWriter
-    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
-    this.maximumMessageCount = maximumMessageCount;
-    this.messageTimeToLive = messageTimeToLive;
-
-    // Initialize the statistics
-    StatisticsFactory factory;
-    if (isGatewayReceiver) {
-      factory = new DummyStatisticsFactory();
-    } else {
-      factory = this.getCache().getDistributedSystem();
-    }
-    this._statistics = new CacheClientNotifierStats(factory);
-
-    // Initialize the executors
-    // initializeExecutors(this._logger);
-
-    try {
-      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
-      if (this.logFrequency <= 0) {
-        this.logFrequency = DEFAULT_LOG_FREQUENCY;
-      }
-    } catch (Exception e) {
-      this.logFrequency = DEFAULT_LOG_FREQUENCY;
-    }
-
-    eventEnqueueWaitTime =
-        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
-    if (eventEnqueueWaitTime < 0) {
-      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
-    }
-
-    // Schedule task to periodically ping clients.
-    scheduleClientPingTask();
-  }
-
-  /**
    * this message is used to send interest registration to another server. Since interest
    * registration performs a state-flush operation this message must not transmitted on an ordered
    * socket
@@ -2228,104 +1995,6 @@ public class CacheClientNotifier {
 
   }
 
-
-  // * Initializes the <code>QueuedExecutor</code> and
-  // <code>PooledExecutor</code>
-  // * used to deliver messages to <code>CacheClientProxy</code> instances.
-  // * @param logger The GemFire <code>LogWriterI18n</code>
-  // */
-  // private void initializeExecutors(LogWriterI18n logger)
-  // {
-  // // Create the thread groups
-  // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache
-  // Client Notifier Logger Group", logger);
-  // final ThreadGroup notifierGroup =
-  // new ThreadGroup("Cache Client Notifier Group")
-  // {
-  // public void uncaughtException(Thread t, Throwable e)
-  // {
-  // Thread.dumpStack();
-  // loggerGroup.uncaughtException(t, e);
-  // //CacheClientNotifier.exceptionInThreads = true;
-  // }
-  // };
-  //
-  // // Originally set ThreadGroup to be a daemon, but it was causing the
-  // following
-  // // exception after five minutes of non-activity (the keep alive time of the
-  // // threads in the PooledExecutor.
-  //
-  // // java.lang.IllegalThreadStateException
-  // // at java.lang.ThreadGroup.add(Unknown Source)
-  // // at java.lang.Thread.init(Unknown Source)
-  // // at java.lang.Thread.<init>(Unknown Source)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321)
-  // // at
-  // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512)
-  // // at
-  // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95)
-  // // at
-  // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271)
-  //
-  // //notifierGroup.setDaemon(true);
-  //
-  // if (USE_QUEUED_EXECUTOR)
-  // createQueuedExecutor(notifierGroup);
-  // else
-  // createPooledExecutor(notifierGroup);
-  // }
-
-  // /**
-  // * Creates the <code>QueuedExecutor</code> used to deliver messages
-  // * to <code>CacheClientProxy</code> instances
-  // * @param notifierGroup The <code>ThreadGroup</code> to which the
-  // * <code>QueuedExecutor</code>'s <code>Threads</code> belong
-  // */
-  // protected void createQueuedExecutor(final ThreadGroup notifierGroup)
-  // {
-  // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue());
-  // queuedExecutor.setThreadFactory(new ThreadFactory()
-  // {
-  // public Thread newThread(Runnable command)
-  // {
-  // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client
-  // Notifier");
-  // thread.setDaemon(true);
-  // return thread;
-  // }
-  // });
-  // _executor = queuedExecutor;
-  // }
-
-  // /**
-  // * Creates the <code>PooledExecutor</code> used to deliver messages
-  // * to <code>CacheClientProxy</code> instances
-  // * @param notifierGroup The <code>ThreadGroup</code> to which the
-  // * <code>PooledExecutor</code>'s <code>Threads</code> belong
-  // */
-  // protected void createPooledExecutor(final ThreadGroup notifierGroup)
-  // {
-  // PooledExecutor pooledExecutor = new PooledExecutor(new
-  // BoundedLinkedQueue(4096), 50);
-  // pooledExecutor.setMinimumPoolSize(10);
-  // pooledExecutor.setKeepAliveTime(1000 * 60 * 5);
-  // pooledExecutor.setThreadFactory(new ThreadFactory()
-  // {
-  // public Thread newThread(Runnable command)
-  // {
-  // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client
-  // Notifier");
-  // thread.setDaemon(true);
-  // return thread;
-  // }
-  // });
-  // pooledExecutor.createThreads(5);
-  // _executor = pooledExecutor;
-  // }
-
   protected void deliverInterestChange(ClientProxyMembershipID proxyID,
       ClientInterestMessageImpl message) {
     DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
@@ -2471,23 +2140,6 @@ public class CacheClientNotifier {
    */
   protected static final int ALL_PORTS = -1;
 
-  // /**
-  // * Whether to synchonously deliver messages to proxies.
-  // * This is currently hard-coded to true to ensure ordering.
-  // */
-  // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION =
-  // true;
-  // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION");
-
-  // /**
-  // * Whether to use the <code>QueuedExecutor</code> (or the
-  // * <code>PooledExecutor</code>) to deliver messages to proxies.
-  // * Currently, delivery is synchronous. No <code>Executor</code> is
-  // * used.
-  // */
-  // protected static final boolean USE_QUEUED_EXECUTOR =
-  // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR");
-
   /**
    * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
    * CacheClientProxy. Note that the keys in this map are not updated when a durable client
@@ -2512,14 +2164,7 @@ public class CacheClientNotifier {
    * direct reference to _cache in CacheClientNotifier code. Instead, you should always use
    * <code>getCache()</code>
    */
-  private GemFireCacheImpl _cache;
-
-  private InternalLogWriter logWriter;
-
-  /**
-   * The GemFire security <code>LogWriter</code>
-   */
-  private InternalLogWriter securityLogWriter;
+  private GemFireCacheImpl _cache; // TODO: not thread-safe
 
   /** the maximum number of messages that can be enqueued in a client-queue. */
   private int maximumMessageCount;
@@ -2543,10 +2188,6 @@ public class CacheClientNotifier {
    */
   private volatile HAContainerWrapper haContainer;
 
-  // /**
-  // * The singleton <code>CacheClientNotifier</code> instance
-  // */
-  // protected static CacheClientNotifier _instance;
   /**
    * The size of the server-to-client communication socket buffers. This can be modified using the
    * BridgeServer.SOCKET_BUFFER_SIZE system property.


[05/12] geode git commit: GEODE-2716: export logs default behavior changed from filtering at log level INFO to ALL.

Posted by kl...@apache.org.
GEODE-2716: export logs default behavior changed from filtering at log level INFO to ALL.

* Removed reliance on LogService.DEFAULT_LOG_LEVEL, added ExportLogCommand.DEFAULT_EXPORT_LOG_LEVEL.
* Added DUnit test that fails under previous behavior.
* this closes #439


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5430c91f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5430c91f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5430c91f

Branch: refs/heads/feature/GEODE-2632
Commit: 5430c91f61fdb9ae4b0d11758d2b8e5a18f52163
Parents: 669d3ed
Author: Patrick Rhomberg <pr...@pivotal.io>
Authored: Tue Mar 28 15:12:02 2017 -0700
Committer: Jinmei Liao <ji...@pivotal.io>
Committed: Thu Apr 6 10:17:36 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/logging/LogService.java      |  1 -
 .../cli/commands/ExportLogsCommand.java         |  4 +-
 .../cli/functions/ExportLogsFunction.java       |  2 +-
 .../internal/cli/i18n/CliStrings.java           |  4 +-
 .../cli/commands/ExportLogsDUnitTest.java       |  7 +++
 .../cli/functions/ExportLogsFunctionTest.java   | 45 ++++++++++++++++++++
 .../cli/commands/golden-help-offline.properties | 11 ++---
 7 files changed, 64 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
index 1f8a564..5a229d3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
@@ -46,7 +46,6 @@ public class LogService extends LogManager {
   public static final String BASE_LOGGER_NAME = "org.apache.geode";
   public static final String MAIN_LOGGER_NAME = "org.apache.geode";
   public static final String SECURITY_LOGGER_NAME = "org.apache.geode.security";
-  public static final String DEFAULT_LOG_LEVEL = "INFO";
 
   public static final String GEODE_VERBOSE_FILTER = "{GEODE_VERBOSE}";
   public static final String GEMFIRE_VERBOSE_FILTER = "{GEMFIRE_VERBOSE}";

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ExportLogsCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ExportLogsCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ExportLogsCommand.java
index fe9cecd..1d5c412 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ExportLogsCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ExportLogsCommand.java
@@ -57,6 +57,8 @@ public class ExportLogsCommand implements CommandMarker {
   public static final String FORMAT = "yyyy/MM/dd/HH/mm/ss/SSS/z";
   public static final String ONLY_DATE_FORMAT = "yyyy/MM/dd";
 
+  public final static String DEFAULT_EXPORT_LOG_LEVEL = "ALL";
+
   private static final Pattern DISK_SPACE_LIMIT_PATTERN = Pattern.compile("(\\d+)([mgtMGT]?)");
 
   @CliCommand(value = CliStrings.EXPORT_LOGS, help = CliStrings.EXPORT_LOGS__HELP)
@@ -77,7 +79,7 @@ public class ExportLogsCommand implements CommandMarker {
           optionContext = ConverterHint.ALL_MEMBER_IDNAME,
           help = CliStrings.EXPORT_LOGS__MEMBER__HELP) String[] memberIds,
       @CliOption(key = CliStrings.EXPORT_LOGS__LOGLEVEL,
-          unspecifiedDefaultValue = LogService.DEFAULT_LOG_LEVEL,
+          unspecifiedDefaultValue = DEFAULT_EXPORT_LOG_LEVEL,
           optionContext = ConverterHint.LOG_LEVEL,
           help = CliStrings.EXPORT_LOGS__LOGLEVEL__HELP) String logLevel,
       @CliOption(key = CliStrings.EXPORT_LOGS__UPTO_LOGLEVEL, unspecifiedDefaultValue = "false",

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunction.java
index 13124c5..3ce1721 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunction.java
@@ -168,7 +168,7 @@ public class ExportLogsFunction implements Function, InternalEntity {
       this.endTime = parseTime(endTime);
 
       if (StringUtils.isBlank(logLevel)) {
-        this.logLevel = Level.INFO;
+        this.logLevel = LogLevel.getLevel(ExportLogsCommand.DEFAULT_EXPORT_LOG_LEVEL);
       } else {
         this.logLevel = LogLevel.getLevel(logLevel);
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index dfc4cf1..67955dc 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -1391,7 +1391,7 @@ public class CliStrings {
   public static final String EXPORT_LOGS__HELP = "Export the log files for a member or members.";
   public static final String EXPORT_LOGS__DIR = "dir";
   public static final String EXPORT_LOGS__DIR__HELP =
-      "Local directory to which logs will be written. This is used only when you are exporting logs using an http connection. If not specified, logs are written to the location specified by the user.dir system property.";
+      "Directory to which logs will be written.  This refers to a local directory when exporting logs using an http connection, but refers to the filesystem of the manager when connected via JMX. If not specified, logs are written to the location specified by the user.dir system property.";
   public static final String EXPORT_LOGS__MEMBER = "member";
   public static final String EXPORT_LOGS__MEMBER__HELP =
       "Name/Id of the member whose log files will be exported.";
@@ -1401,7 +1401,7 @@ public class CliStrings {
   public static final String EXPORT_LOGS__MSG__CANNOT_EXECUTE = "Cannot execute";
   public static final String EXPORT_LOGS__LOGLEVEL = LOG_LEVEL;
   public static final String EXPORT_LOGS__LOGLEVEL__HELP =
-      "Minimum level of log entries to export. Valid values are: fatal, error, warn, info, debug, trace and all.  The default is \"INFO\".";
+      "Minimum level of log entries to export. Valid values are: fatal, error, warn, info, debug, trace and all.  The default is \"ALL\".";
   public static final String EXPORT_LOGS__UPTO_LOGLEVEL = "only-log-level";
   public static final String EXPORT_LOGS__UPTO_LOGLEVEL__HELP =
       "Whether to only include those entries that exactly match the --log-level specified.";

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ExportLogsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ExportLogsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ExportLogsDUnitTest.java
index d0180d0..c960c8f 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ExportLogsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ExportLogsDUnitTest.java
@@ -177,6 +177,13 @@ public class ExportLogsDUnitTest {
   }
 
   @Test
+  public void testExportWithNoOptionsGiven() throws Exception {
+    CommandResult result = gfshConnector.executeAndVerifyCommand("export logs");
+    Set<String> acceptedLogLevels = Stream.of("info", "error", "debug").collect(toSet());
+    verifyZipFileContents(acceptedLogLevels);
+  }
+
+  @Test
   public void testExportWithNoFilters() throws Exception {
     gfshConnector.executeAndVerifyCommand("export logs --log-level=all");
 

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunctionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunctionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunctionTest.java
new file mode 100644
index 0000000..4e72444
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/ExportLogsFunctionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.internal.logging.log4j.LogLevel;
+import org.apache.geode.management.internal.cli.commands.ExportLogsCommand;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.apache.logging.log4j.Level;
+import org.junit.experimental.categories.Category;
+
+@Category(UnitTest.class)
+public class ExportLogsFunctionTest {
+  @Test
+  public void defaultExportLogLevelShouldBeAll() throws Exception {
+    assertTrue(ExportLogsCommand.DEFAULT_EXPORT_LOG_LEVEL.equals("ALL"));
+    assertEquals(LogLevel.getLevel(ExportLogsCommand.DEFAULT_EXPORT_LOG_LEVEL), Level.ALL);
+  }
+
+  @Test
+  public void defaultExportLogLevelShouldBeAllViaArgs() throws Exception {
+    ExportLogsFunction.Args args = new ExportLogsFunction.Args("", "", "", false, false, false);
+    assertEquals(args.getLogLevel(), Level.ALL);
+    ExportLogsFunction.Args args2 = new ExportLogsFunction.Args("", "", null, false, false, false);
+    assertEquals(args2.getLogLevel(), Level.ALL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/5430c91f/geode-core/src/test/resources/org/apache/geode/management/internal/cli/commands/golden-help-offline.properties
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/org/apache/geode/management/internal/cli/commands/golden-help-offline.properties b/geode-core/src/test/resources/org/apache/geode/management/internal/cli/commands/golden-help-offline.properties
index 3c56def..af6b4bf 100644
--- a/geode-core/src/test/resources/org/apache/geode/management/internal/cli/commands/golden-help-offline.properties
+++ b/geode-core/src/test/resources/org/apache/geode/management/internal/cli/commands/golden-help-offline.properties
@@ -1473,9 +1473,10 @@ SYNTAX\n\
 \ \ \ \ [--end-time=value] [--logs-only(=value)?] [--stats-only(=value)?]\n\
 PARAMETERS\n\
 \ \ \ \ dir\n\
-\ \ \ \ \ \ \ \ Local directory to which logs will be written. This is used only when you are exporting\n\
-\ \ \ \ \ \ \ \ logs using an http connection. If not specified, logs are written to the location specified\n\
-\ \ \ \ \ \ \ \ by the user.dir system property.\n\
+\ \ \ \ \ \ \ \ Directory to which logs will be written.  This refers to a local directory when exporting\n\
+\ \ \ \ \ \ \ \ logs using an http connection, but refers to the filesystem of the manager when connected\n\
+\ \ \ \ \ \ \ \ via JMX. If not specified, logs are written to the location specified by the user.dir\n\
+\ \ \ \ \ \ \ \ system property.\n\
 \ \ \ \ \ \ \ \ Required: false\n\
 \ \ \ \ group\n\
 \ \ \ \ \ \ \ \ Group of members whose log files will be exported.\n\
@@ -1485,9 +1486,9 @@ PARAMETERS\n\
 \ \ \ \ \ \ \ \ Required: false\n\
 \ \ \ \ log-level\n\
 \ \ \ \ \ \ \ \ Minimum level of log entries to export. Valid values are: fatal, error, warn, info, debug,\n\
-\ \ \ \ \ \ \ \ trace and all.  The default is "INFO".\n\
+\ \ \ \ \ \ \ \ trace and all.  The default is "ALL".\n\
 \ \ \ \ \ \ \ \ Required: false\n\
-\ \ \ \ \ \ \ \ Default (if the parameter is not specified): INFO\n\
+\ \ \ \ \ \ \ \ Default (if the parameter is not specified): ALL\n\
 \ \ \ \ only-log-level\n\
 \ \ \ \ \ \ \ \ Whether to only include those entries that exactly match the --log-level specified.\n\
 \ \ \ \ \ \ \ \ Required: false\n\


[06/12] geode git commit: GEODE-2756: Do not put security-* properties in the env.

Posted by kl...@apache.org.
GEODE-2756: Do not put security-* properties in the env.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/19376d30
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/19376d30
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/19376d30

Branch: refs/heads/feature/GEODE-2632
Commit: 19376d3069a7808481b2ed572ea49e3610dc9df1
Parents: 5430c91
Author: Jinmei Liao <ji...@pivotal.io>
Authored: Thu Apr 6 09:50:57 2017 -0700
Committer: Jinmei Liao <ji...@pivotal.io>
Committed: Thu Apr 6 14:16:27 2017 -0700

----------------------------------------------------------------------
 .../support/LoginHandlerInterceptor.java        | 27 +++--------
 .../LoginHandlerInterceptorJUnitTest.java       | 51 +++++++++++++-------
 ...andlerInterceptorRequestHeaderJUnitTest.java | 28 ++++++++---
 3 files changed, 60 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/19376d30/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptor.java b/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptor.java
index 9e05174..56d9b9e 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptor.java
@@ -22,7 +22,6 @@ import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.management.internal.cli.multistep.CLIMultiStepHelper;
 import org.apache.geode.management.internal.security.ResourceConstants;
 import org.apache.geode.management.internal.web.util.UriUtils;
-import org.apache.geode.security.Authenticator;
 import org.apache.logging.log4j.Logger;
 import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
 
@@ -51,8 +50,6 @@ public class LoginHandlerInterceptor extends HandlerInterceptorAdapter {
 
   private Cache cache;
 
-  private Authenticator auth = null;
-
   private SecurityService securityService = IntegratedSecurityService.getSecurityService();
 
   private static final ThreadLocal<Map<String, String>> ENV =
@@ -93,22 +90,10 @@ public class LoginHandlerInterceptor extends HandlerInterceptorAdapter {
       }
     }
 
+    ENV.set(requestParameterValues);
 
-
-    for (Enumeration<String> requestHeaders = request.getHeaderNames(); requestHeaders
-        .hasMoreElements();) {
-
-      // since http request headers are case-insensitive and all our security-* properties
-      // are in lower case, it's safe to do toLowerCase here.
-      final String requestHeader = requestHeaders.nextElement().toLowerCase();
-
-      if (requestHeader.startsWith(SECURITY_VARIABLE_REQUEST_HEADER_PREFIX)) {
-        requestParameterValues.put(requestHeader, request.getHeader(requestHeader));
-      }
-    }
-
-    String username = requestParameterValues.get(ResourceConstants.USER_NAME);
-    String password = requestParameterValues.get(ResourceConstants.PASSWORD);
+    String username = request.getHeader(ResourceConstants.USER_NAME);
+    String password = request.getHeader(ResourceConstants.PASSWORD);
     Properties credentials = new Properties();
     if (username != null)
       credentials.put(ResourceConstants.USER_NAME, username);
@@ -116,11 +101,13 @@ public class LoginHandlerInterceptor extends HandlerInterceptorAdapter {
       credentials.put(ResourceConstants.PASSWORD, password);
     this.securityService.login(credentials);
 
-    ENV.set(requestParameterValues);
-
     return true;
   }
 
+  public void setSecurityService(SecurityService securityService) {
+    this.securityService = securityService;
+  }
+
 
   @Override
   public void afterCompletion(final HttpServletRequest request, final HttpServletResponse response,

http://git-wip-us.apache.org/repos/asf/geode/blob/19376d30/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorJUnitTest.java b/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorJUnitTest.java
index 63d410f..80e26fd 100644
--- a/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorJUnitTest.java
+++ b/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorJUnitTest.java
@@ -14,16 +14,13 @@
  */
 package org.apache.geode.management.internal.web.controllers.support;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-
-import edu.umd.cs.mtc.MultithreadedTestCase;
-import edu.umd.cs.mtc.TestFramework;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.test.junit.categories.UnitTest;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
 import org.jmock.lib.concurrent.Synchroniser;
@@ -33,7 +30,13 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
+import edu.umd.cs.mtc.MultithreadedTestCase;
+import edu.umd.cs.mtc.TestFramework;
 
 /**
  * The LoginHandlerInterceptorJUnitTest class is a test suite of test cases to test the contract and
@@ -83,7 +86,6 @@ public class LoginHandlerInterceptorJUnitTest {
   @Test
   public void testPreHandleAfterCompletion() throws Exception {
     final Map<String, String> requestParameters = new HashMap<>(2);
-    final Map<String, String> requestHeaders = new HashMap<>();
 
     requestParameters.put("parameter", "one");
     requestParameters.put(createEnvironmentVariable("variable"), "two");
@@ -95,8 +97,10 @@ public class LoginHandlerInterceptorJUnitTest {
       {
         oneOf(mockHttpRequest).getParameterNames();
         will(returnValue(enumeration(requestParameters.keySet().iterator())));
-        oneOf(mockHttpRequest).getHeaderNames();
-        will(returnValue(enumeration(requestHeaders.keySet().iterator())));
+        oneOf(mockHttpRequest).getHeader(ResourceConstants.USER_NAME);
+        will(returnValue("admin"));
+        oneOf(mockHttpRequest).getHeader(ResourceConstants.PASSWORD);
+        will(returnValue("password"));
         oneOf(mockHttpRequest).getParameter(with(equal(createEnvironmentVariable("variable"))));
         will(returnValue(requestParameters.get(createEnvironmentVariable("variable"))));
       }
@@ -144,7 +148,6 @@ public class LoginHandlerInterceptorJUnitTest {
       super.initialize();
 
       final Map<String, String> requestParametersOne = new HashMap<>(3);
-      final Map<String, String> requestHeaders = new HashMap<>();
 
       requestParametersOne.put("param", "one");
       requestParametersOne.put(createEnvironmentVariable("STAGE"), "test");
@@ -157,8 +160,10 @@ public class LoginHandlerInterceptorJUnitTest {
         {
           oneOf(mockHttpRequestOne).getParameterNames();
           will(returnValue(enumeration(requestParametersOne.keySet().iterator())));
-          oneOf(mockHttpRequestOne).getHeaderNames();
-          will(returnValue(enumeration(requestHeaders.keySet().iterator())));
+          oneOf(mockHttpRequestOne).getHeader(ResourceConstants.USER_NAME);
+          will(returnValue("admin"));
+          oneOf(mockHttpRequestOne).getHeader(ResourceConstants.PASSWORD);
+          will(returnValue("password"));
           oneOf(mockHttpRequestOne).getParameter(with(equal(createEnvironmentVariable("STAGE"))));
           will(returnValue(requestParametersOne.get(createEnvironmentVariable("STAGE"))));
           oneOf(mockHttpRequestOne)
@@ -180,8 +185,10 @@ public class LoginHandlerInterceptorJUnitTest {
         {
           oneOf(mockHttpRequestTwo).getParameterNames();
           will(returnValue(enumeration(requestParametersTwo.keySet().iterator())));
-          oneOf(mockHttpRequestTwo).getHeaderNames();
-          will(returnValue(enumeration(requestHeaders.keySet().iterator())));
+          oneOf(mockHttpRequestTwo).getHeader(ResourceConstants.USER_NAME);
+          will(returnValue("admin"));
+          oneOf(mockHttpRequestTwo).getHeader(ResourceConstants.PASSWORD);
+          will(returnValue("password"));
           oneOf(mockHttpRequestTwo).getParameter(with(equal(createEnvironmentVariable("HOST"))));
           will(returnValue(requestParametersTwo.get(createEnvironmentVariable("HOST"))));
           oneOf(mockHttpRequestTwo)
@@ -210,6 +217,8 @@ public class LoginHandlerInterceptorJUnitTest {
       assertFalse(env.containsKey("param"));
       assertFalse(env.containsKey("parameter"));
       assertFalse(env.containsKey("HOST"));
+      assertFalse(env.containsKey("security-username"));
+      assertFalse(env.containsKey("security-password"));
       assertEquals("test", env.get("STAGE"));
       assertEquals("/path/to/gemfire/700", env.get("GEODE_HOME"));
 
@@ -222,6 +231,8 @@ public class LoginHandlerInterceptorJUnitTest {
       assertFalse(env.containsKey("param"));
       assertFalse(env.containsKey("parameter"));
       assertFalse(env.containsKey("HOST"));
+      assertFalse(env.containsKey("security-username"));
+      assertFalse(env.containsKey("security-password"));
       assertEquals("test", env.get("STAGE"));
       assertEquals("/path/to/gemfire/700", env.get("GEODE_HOME"));
 
@@ -234,6 +245,8 @@ public class LoginHandlerInterceptorJUnitTest {
       assertFalse(env.containsKey("param"));
       assertFalse(env.containsKey("parameter"));
       assertFalse(env.containsKey("HOST"));
+      assertFalse(env.containsKey("security-username"));
+      assertFalse(env.containsKey("security-password"));
       assertEquals("test", env.get("STAGE"));
       assertEquals("/path/to/gemfire/700", env.get("GEODE_HOME"));
 
@@ -263,6 +276,8 @@ public class LoginHandlerInterceptorJUnitTest {
       assertFalse(env.containsKey("parameter"));
       assertFalse(env.containsKey("param"));
       assertFalse(env.containsKey("STAGE"));
+      assertFalse(env.containsKey("security-username"));
+      assertFalse(env.containsKey("security-password"));
       assertEquals("localhost", env.get("HOST"));
       assertEquals("/path/to/gemfire/75", env.get("GEODE_HOME"));
 

http://git-wip-us.apache.org/repos/asf/geode/blob/19376d30/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorRequestHeaderJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorRequestHeaderJUnitTest.java b/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorRequestHeaderJUnitTest.java
index 118e385..00156cd 100644
--- a/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorRequestHeaderJUnitTest.java
+++ b/geode-web/src/test/java/org/apache/geode/management/internal/web/controllers/support/LoginHandlerInterceptorRequestHeaderJUnitTest.java
@@ -16,23 +16,33 @@ package org.apache.geode.management.internal.web.controllers.support;
 
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.verify;
 
+import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.UnitTest;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.springframework.mock.web.MockHttpServletRequest;
 
 import java.util.Map;
+import java.util.Properties;
 
 @Category(UnitTest.class)
 public class LoginHandlerInterceptorRequestHeaderJUnitTest {
 
+  SecurityService securityService;
+  LoginHandlerInterceptor interceptor;
+
   @Before
   public void before() {
     LoginHandlerInterceptor.getEnvironment().clear();
+    securityService = Mockito.mock(SecurityService.class);
+    interceptor = new LoginHandlerInterceptor();
+    interceptor.setSecurityService(securityService);
   }
 
   @After
@@ -42,21 +52,23 @@ public class LoginHandlerInterceptorRequestHeaderJUnitTest {
 
   @Test
   public void testCaseInsensitive() throws Exception {
-    LoginHandlerInterceptor interceptor = new LoginHandlerInterceptor();
     MockHttpServletRequest mockRequest = new MockHttpServletRequest();
     mockRequest.addHeader("Security-Username", "John");
     mockRequest.addHeader("Security-Password", "Password");
     mockRequest.addHeader("security-something", "anything");
     mockRequest.addHeader("Content-Type", "application/json");
 
+
     interceptor.preHandle(mockRequest, null, null);
-    Map<String, String> env = interceptor.getEnvironment();
 
-    // make sure only security-* are put in the environment variable
-    assertThat(env).hasSize(3);
-    assertThat(env.get("security-username")).isEqualTo("John");
-    assertThat(env.get("security-password")).isEqualTo("Password");
-    assertThat(env.get("security-something")).isEqualTo("anything");
+    ArgumentCaptor<Properties> props = ArgumentCaptor.forClass(Properties.class);
+    verify(securityService).login(props.capture());
+    assertThat(props.getValue().getProperty("security-username")).isEqualTo("John");
+    assertThat(props.getValue().getProperty("security-password")).isEqualTo("Password");
+
+    Map<String, String> env = interceptor.getEnvironment();
+    // make sure security-* are not put in the environment variable
+    assertThat(env).hasSize(0);
   }
 
 }


[10/12] geode git commit: WIP refactoring

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
new file mode 100644
index 0000000..036f6af
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65RealBenchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.apache.geode.internal.cache.TXManagerImpl.NOTX;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.ServerLauncher.Builder;
+import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+@Category(IntegrationTest.class)
+public class Put65RealBenchTest {
+
+  private ServerConnection realServerConnection;
+
+  public Command put65Command;
+  public ServerConnection mockServerConnection;
+  public Message mockMessage;
+
+  private File workingDir;
+  private int serverPort;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    this.workingDir = temporaryFolder.getRoot();
+
+    this.serverPort = getRandomAvailablePort(SOCKET);
+
+    ServerLauncher serverLauncher = new ServerLauncher.Builder().setMemberName("server1")
+        .setRedirectOutput(true).setWorkingDirectory(this.workingDir.getAbsolutePath())
+        .set(MCAST_PORT, "0").set(LOCATORS, "").build();
+
+    serverLauncher.start();
+
+    Cache cache = getCache(serverLauncher);
+    CacheServer cacheServer = getCacheServer(cache);
+    AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+    this.realServerConnection = null;
+
+    this.mockServerConnection = mock(ServerConnection.class,
+        withSettings().name("mockServerConnection").spiedInstance(this.realServerConnection));
+    when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT);
+
+    ClientProxyMembershipID mockProxyId =
+        mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId"));
+    when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId);
+
+    // Message mockErrorResponseMessage =
+    // mock(Message.class, withSettings().name("mockErrorResponseMessage"));
+    // when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(mockErrorResponseMessage);
+
+    Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart"));
+    when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart");
+
+    Part mockOperationPart = mock(Part.class, withSettings().name("mockOperationPart"));
+    when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE);
+
+    Part mockFlagsPart = mock(Part.class, withSettings().name("mockFlagsPart"));
+    when(mockFlagsPart.getInt()).thenReturn(0);
+
+    Part mockKeyPart = mock(Part.class, withSettings().name("mockKeyPart"));
+    when(mockKeyPart.getObject()).thenReturn("mockKeyPart");
+    when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart");
+
+    Part mockIsDeltaPart = mock(Part.class, withSettings().name("mockIsDeltaPart"));
+    when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE);
+
+    Part mockValuePart = mock(Part.class, withSettings().name("mockValuePart"));
+    when(mockValuePart.getObject()).thenReturn("mockValuePart");
+
+    Part mockEventPart = mock(Part.class, withSettings().name("mockEventPart"));
+    when(mockEventPart.getObject()).thenReturn("mockEventPart");
+
+    Part mockCallbackArgPart = mock(Part.class, withSettings().name("mockCallbackArgPart"));
+    when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart");
+
+    this.mockMessage = mock(Message.class, withSettings().name("mockMessage"));
+
+    when(this.mockMessage.getTransactionId()).thenReturn(NOTX);
+
+    when(this.mockMessage.getPart(0)).thenReturn(mockRegionNamePart);
+    when(this.mockMessage.getPart(1)).thenReturn(mockOperationPart);
+    when(this.mockMessage.getPart(2)).thenReturn(mockFlagsPart);
+    when(this.mockMessage.getPart(3)).thenReturn(mockKeyPart);
+    when(this.mockMessage.getPart(4)).thenReturn(mockIsDeltaPart);
+    when(this.mockMessage.getPart(5)).thenReturn(mockValuePart);
+    when(this.mockMessage.getPart(6)).thenReturn(mockEventPart);
+    when(this.mockMessage.getPart(7)).thenReturn(mockCallbackArgPart);
+
+    this.put65Command = Put65.getCommand();
+  }
+
+  @Test
+  public void benchmark() {
+    this.put65Command.execute(this.mockMessage, this.mockServerConnection);
+    // Message replyMessage = state.mockServerConnection.getReplyMessage();
+    // blackhole.consume(replyMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
index 0a45494..a0b41b1 100644
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -2880,7 +2880,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
         public void run2() throws CacheException {
           // Set the Test Hook!
           // This test hook will pause during the drain process
-          CacheClientProxy.testHook = new RejectClientReconnectTestHook();
+          CacheClientProxy.setTestHook(new RejectClientReconnectTestHook());
         }
       });
 
@@ -2909,8 +2909,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
           WaitCriterion ev = new WaitCriterion() {
             @Override
             public boolean done() {
-              return CacheClientProxy.testHook != null
-                  && (((RejectClientReconnectTestHook) CacheClientProxy.testHook)
+              return CacheClientProxy.getTestHook() != null
+                  && (((RejectClientReconnectTestHook) CacheClientProxy.getTestHook())
                       .wasClientRejected());
             }
 
@@ -2921,7 +2921,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
           };
           Wait.waitForCriterion(ev, 10 * 1000, 200, true);
           assertTrue(
-              ((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+              ((RejectClientReconnectTestHook) CacheClientProxy.getTestHook()).wasClientRejected());
         }
       });
 
@@ -2958,7 +2958,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
       this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
         @Override
         public void run2() throws CacheException {
-          CacheClientProxy.testHook = null;
+          CacheClientProxy.unsetTestHook();
         }
       });
     }
@@ -3012,7 +3012,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
 
               // Set the Test Hook!
               // This test hook will pause during the drain process
-              CacheClientProxy.testHook = new CqExceptionDueToActivatingClientTestHook();
+              CacheClientProxy.setTestHook(new CqExceptionDueToActivatingClientTestHook());
 
               final CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
               final CacheClientProxy clientProxy = ccnInstance.getClientProxy(durableClientId);
@@ -3072,7 +3072,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
       this.server1VM.invoke(new CacheSerializableRunnable("unset test hook") {
         @Override
         public void run2() throws CacheException {
-          CacheClientProxy.testHook = null;
+          CacheClientProxy.unsetTestHook();
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
index 5533376..5c2620f 100755
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -439,15 +439,15 @@ public class DurableClientTestCase extends JUnit4DistributedTestCase {
         // Find the proxy
         CacheClientProxy proxy = getClientProxy();
         assertNotNull(proxy);
-        assertNotNull(proxy._socket);
+        assertNotNull(proxy.getSocketForTesting());
         long end = System.currentTimeMillis() + 60000;
 
-        while (!proxy._socket.isClosed()) {
+        while (!proxy.getSocketForTesting().isClosed()) {
           if (System.currentTimeMillis() > end) {
             break;
           }
         }
-        assertTrue(proxy._socket.isClosed());
+        assertTrue(proxy.getSocketForTesting().isClosed());
       }
     });
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7a83cccb/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
index d548613..53995df 100755
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -151,19 +151,19 @@ public class Simple2CacheServerDUnitTest extends WANTestBase {
   }
 
   public static void setCacheClientProxyTestHook() {
-    CacheClientProxy.testHook = new CacheClientProxy.TestHook() {
+    CacheClientProxy.setTestHook(new CacheClientProxy.TestHook() {
       @Override
       public void doTestHook(String spot) {
         if (spot.equals("CLIENT_RECONNECTED")) {
           afterProxyReinitialized++;
         }
       }
-    };
+    });
   }
 
   public static void checkResultAndUnsetCacheClientProxyTestHook() {
     // Reinitialize only happened once
-    CacheClientProxy.testHook = null;
+    CacheClientProxy.unsetTestHook();
     assertEquals(1, afterProxyReinitialized);
     afterProxyReinitialized = 0;
   }


[03/12] geode git commit: GEODE-2684 Connection & ConnectionTable cleanup

Posted by kl...@apache.org.
GEODE-2684 Connection & ConnectionTable cleanup

removed another old comment


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/742c8f27
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/742c8f27
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/742c8f27

Branch: refs/heads/feature/GEODE-2632
Commit: 742c8f27ea45ec55445f1047fda3c54b86f89f94
Parents: 6b2b7b2
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 16:49:07 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 16:49:07 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/geode/internal/tcp/Connection.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/742c8f27/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index c57a0ba..70868e0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -3939,9 +3939,9 @@ public class Connection implements Runnable {
       nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
 
       if (oldBuffer != null) {
-        int oldByteCount = oldBuffer.remaining(); // needed to workaround JRockit 1.4.2.04 bug
+        int oldByteCount = oldBuffer.remaining();
         nioInputBuffer.put(oldBuffer);
-        nioInputBuffer.position(oldByteCount); // workaround JRockit 1.4.2.04 bug
+        nioInputBuffer.position(oldByteCount);
         Buffers.releaseReceiveBuffer(oldBuffer, stats);
       }
     } else {