You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/23 00:21:29 UTC

[1/6] geode git commit: Cleanup Message class

Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632-16 c6c2f5c05 -> 4d4305dec


Cleanup Message class


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/971ab8b5
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/971ab8b5
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/971ab8b5

Branch: refs/heads/feature/GEODE-2632-16
Commit: 971ab8b512011a838c1201031ef78c0938e3e6c1
Parents: c6c2f5c
Author: Kirk Lund <kl...@apache.org>
Authored: Mon May 22 13:47:55 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Mon May 22 13:47:55 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/geode/Instantiator.java     | 112 ++--
 .../geode/cache/client/internal/AbstractOp.java |   2 +-
 .../geode/cache/client/internal/PingOp.java     |  10 +-
 .../cache/tier/sockets/CacheClientUpdater.java  |  17 +-
 .../cache/tier/sockets/ChunkedMessage.java      |  19 +-
 .../internal/cache/tier/sockets/Message.java    | 591 ++++++++++---------
 .../cache/tier/sockets/ServerConnection.java    |  65 +-
 .../apache/geode/internal/tcp/Connection.java   |   2 +-
 .../org/apache/geode/internal/util/IOUtils.java |   6 +-
 .../cache/tier/sockets/MessageJUnitTest.java    |  64 +-
 .../internal/JUnit4DistributedTestCase.java     |   2 +-
 ...arallelGatewaySenderOperationsDUnitTest.java |  16 +-
 12 files changed, 448 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/Instantiator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/Instantiator.java b/geode-core/src/main/java/org/apache/geode/Instantiator.java
index 3c1ca06..c727e5b 100644
--- a/geode-core/src/main/java/org/apache/geode/Instantiator.java
+++ b/geode-core/src/main/java/org/apache/geode/Instantiator.java
@@ -20,15 +20,15 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 /**
- * <code>Instantiator</code> allows classes that implement {@link DataSerializable} to be registered
- * with the data serialization framework. Knowledge of <code>DataSerializable</code> classes allows
+ * {@code Instantiator} allows classes that implement {@link DataSerializable} to be registered
+ * with the data serialization framework. Knowledge of {@code DataSerializable} classes allows
  * the framework to optimize how instances of those classes are data serialized.
  *
  * <P>
  *
- * Ordinarily, when a <code>DataSerializable</code> object is written using
+ * Ordinarily, when a {@code DataSerializable} object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, a special marker class id is
- * written to the stream followed by the class name of the <code>DataSerializable</code> object.
+ * written to the stream followed by the class name of the {@code DataSerializable} object.
  * After the marker class id is read by {@link DataSerializer#readObject} it performs the following
  * operations,
  *
@@ -44,23 +44,20 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
  *
  * </OL>
  *
- * However, if a <code>DataSerializable</code> class is {@linkplain #register(Instantiator)
+ * However, if a {@code DataSerializable} class is {@linkplain #register(Instantiator)
  * registered} with the data serialization framework and assigned a unique class id, an important
  * optimization can be performed that avoid the expense of using reflection to instantiate the
- * <code>DataSerializable</code> class. When the object is written using
+ * {@code DataSerializable} class. When the object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, the object's registered class id
  * is written to the stream. Consequently, when the data is read from the stream, the
- * {@link #newInstance} method of the appropriate <code>Instantiator</code> instance is invoked to
- * create an "empty" instance of the <code>DataSerializable</code> instead of using reflection to
+ * {@link #newInstance} method of the appropriate {@code Instantiator} instance is invoked to
+ * create an "empty" instance of the {@code DataSerializable} instead of using reflection to
  * create the new instance.
  *
  * <P>
  *
- * Commonly, a <code>DataSerializable</code> class will register itself with the
- * <code>Instantiator</code> in a static initializer as shown in the below example code.
- *
- * <!-- The source code for the CompanySerializer class resides in tests/com/examples/ds/User.java
- * Please keep the below code snippet in sync with that file. -->
+ * Commonly, a {@code DataSerializable} class will register itself with the
+ * {@code Instantiator} in a static initializer as shown in the below example code.
  *
  * <PRE>
 public class User implements DataSerializable {
@@ -101,22 +98,22 @@ public class User implements DataSerializable {
 }
  * </PRE>
  *
- * <code>Instantiator</code>s may be distributed to other members of the distributed system when
+ * {@code Instantiator}s may be distributed to other members of the distributed system when
  * they are registered. Consider the following scenario in which VM1 and VM2 are members of the same
  * distributed system. Both VMs define the sameRegion and VM2's region replicates the contents of
- * VM1's using replication. VM1 puts an instance of the above <code>User</code> class into the
- * region. The act of instantiating <code>User</code> will load the <code>User</code> class and
- * invoke its static initializer, thus registering the <code>Instantiator</code> with the data
- * serialization framework. Because the region is a replicate, the <code>User</code> will be data
- * serialized and sent to VM2. However, when VM2 attempts to data deserialize the <code>User</code>,
- * its <code>Instantiator</code> will not necessarily be registered because <code>User</code>'s
+ * VM1's using replication. VM1 puts an instance of the above {@code User} class into the
+ * region. The act of instantiating {@code User} will load the {@code User} class and
+ * invoke its static initializer, thus registering the {@code Instantiator} with the data
+ * serialization framework. Because the region is a replicate, the {@code User} will be data
+ * serialized and sent to VM2. However, when VM2 attempts to data deserialize the {@code User},
+ * its {@code Instantiator} will not necessarily be registered because {@code User}'s
  * static initializer may not have been invoked yet. As a result, an exception would be logged while
- * deserializing the <code>User</code> and the replicate would not appear to have the new value. So,
- * in order to ensure that the <code>Instantiator</code> is registered in VM2, the data
- * serialization framework distributes a message to each member when an <code>Instantiator</code> is
+ * deserializing the {@code User} and the replicate would not appear to have the new value. So,
+ * in order to ensure that the {@code Instantiator} is registered in VM2, the data
+ * serialization framework distributes a message to each member when an {@code Instantiator} is
  * {@linkplain #register(Instantiator) registered}.
  * <p>
- * Note that the framework does not require that an <code>Instantiator</code> be
+ * Note that the framework does not require that an {@code Instantiator} be
  * {@link java.io.Serializable}, but it does require that it provide a
  * {@linkplain #Instantiator(Class, int) two-argument constructor}.
  *
@@ -133,63 +130,64 @@ public abstract class Instantiator {
    */
   private Class<? extends DataSerializable> clazz;
 
-  /** The id of this <code>Instantiator</code> */
+  /** The id of this {@code Instantiator} */
   private int id;
 
-  /** The eventId of this <code>Instantiator</code> */
+  /** The eventId of this {@code Instantiator} */
   private EventID eventId;
 
-  /** The originator of this <code>Instantiator</code> */
+  /** The originator of this {@code Instantiator} */
   private ClientProxyMembershipID context;
 
   /**
-   * Registers a <code>DataSerializable</code> class with the data serialization framework. This
+   * Registers a {@code DataSerializable} class with the data serialization framework. This
    * method is usually invoked from the static initializer of a class that implements
-   * <code>DataSerializable</code>.
+   * {@code DataSerializable}.
    *
-   * @param instantiator An <code>Instantiator</code> whose {@link #newInstance} method is invoked
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked
    *        when an object is data deserialized.
    *
-   * @throws IllegalStateException If class <code>c</code> is already registered with a different
-   *         class id, or another class has already been registered with id <code>classId</code>
-   * @throws NullPointerException If <code>instantiator</code> is <code>null</code>.
+   * @throws IllegalStateException If class {@code c} is already registered with a different
+   *         class id, or another class has already been registered with id {@code classId}
+   * @throws NullPointerException If {@code instantiator} is {@code null}.
    */
   public static synchronized void register(Instantiator instantiator) {
     InternalInstantiator.register(instantiator, true);
   }
 
   /**
-   * Registers a <code>DataSerializable</code> class with the data serialization framework. This
+   * Registers a {@code DataSerializable} class with the data serialization framework. This
    * method is usually invoked from the static initializer of a class that implements
-   * <code>DataSerializable</code>.
+   * {@code DataSerializable}.
    *
-   * @param instantiator An <code>Instantiator</code> whose {@link #newInstance} method is invoked
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked
    *        when an object is data deserialized.
    *
-   * @param distribute True if the registered <code>Instantiator</code> has to be distributed to
+   * @param distribute True if the registered {@code Instantiator} has to be distributed to
    *        other members of the distributed system. Note that if distribute is set to false it may
    *        still be distributed in some cases.
    *
-   * @throws IllegalArgumentException If class <code>c</code> is already registered with a different
-   *         class id, or another class has already been registered with id <code>classId</code>
-   * @throws NullPointerException If <code>instantiator</code> is <code>null</code>.
+   * @throws IllegalArgumentException If class {@code c} is already registered with a different
+   *         class id, or another class has already been registered with id {@code classId}
+   * @throws NullPointerException If {@code instantiator} is {@code null}.
    * @deprecated as of 9.0 use {@link Instantiator#register(Instantiator)} instead
    */
+  @Deprecated
   public static synchronized void register(Instantiator instantiator, boolean distribute) {
     InternalInstantiator.register(instantiator, distribute);
   }
 
   /**
-   * Creates a new <code>Instantiator</code> that instantiates a given class.
+   * Creates a new {@code Instantiator} that instantiates a given class.
    *
-   * @param c The <code>DataSerializable</code> class to register. This class must have a static
-   *        initializer that registers this <code>Instantiator</code>.
-   * @param classId A unique id for class <code>c</code>. The <code>classId</code> must not be zero.
-   *        This has been an <code>int</code> since dsPhase1.
+   * @param c The {@code DataSerializable} class to register. This class must have a static
+   *        initializer that registers this {@code Instantiator}.
+   * @param classId A unique id for class {@code c}. The {@code classId} must not be zero.
+   *        This has been an {@code int} since dsPhase1.
    *
-   * @throws IllegalArgumentException If <code>c</code> does not implement
-   *         <code>DataSerializable</code>, <code>classId</code> is less than or equal to zero.
-   * @throws NullPointerException If <code>c</code> is <code>null</code>
+   * @throws IllegalArgumentException If {@code c} does not implement
+   *         {@code DataSerializable}, {@code classId} is less than or equal to zero.
+   * @throws NullPointerException If {@code c} is {@code null}
    */
   public Instantiator(Class<? extends DataSerializable> c, int classId) {
     if (c == null) {
@@ -205,7 +203,7 @@ public abstract class Instantiator {
 
     if (classId == 0) {
       throw new IllegalArgumentException(LocalizedStrings.Instantiator_CLASS_ID_0_MUST_NOT_BE_0
-          .toLocalizedString(Integer.valueOf(classId)));
+          .toLocalizedString(classId));
     }
 
     this.clazz = c;
@@ -213,7 +211,7 @@ public abstract class Instantiator {
   }
 
   /**
-   * Creates a new "empty" instance of a <Code>DataSerializable</code> class whose state will be
+   * Creates a new "empty" instance of a {@code DataSerializable} class whose state will be
    * filled in by invoking its {@link DataSerializable#fromData fromData} method.
    *
    * @see DataSerializer#readObject
@@ -221,29 +219,29 @@ public abstract class Instantiator {
   public abstract DataSerializable newInstance();
 
   /**
-   * Returns the <code>DataSerializable</code> class that is instantiated by this
-   * <code>Instantiator</code>.
+   * Returns the {@code DataSerializable} class that is instantiated by this
+   * {@code Instantiator}.
    */
   public Class<? extends DataSerializable> getInstantiatedClass() {
     return this.clazz;
   }
 
   /**
-   * Returns the unique <code>id</code> of this <code>Instantiator</code>.
+   * Returns the unique {@code id} of this {@code Instantiator}.
    */
   public int getId() {
     return this.id;
   }
 
   /**
-   * sets the unique <code>eventId</code> of this <code>Instantiator</code>. For internal use only.
+   * sets the unique {@code eventId} of this {@code Instantiator}. For internal use only.
    */
   public void setEventId(Object/* EventID */ eventId) {
     this.eventId = (EventID) eventId;
   }
 
   /**
-   * Returns the unique <code>eventId</code> of this <code>Instantiator</code>. For internal use
+   * Returns the unique {@code eventId} of this {@code Instantiator}. For internal use
    * only.
    */
   public Object/* EventID */ getEventId() {
@@ -251,14 +249,14 @@ public abstract class Instantiator {
   }
 
   /**
-   * sets the context of this <code>Instantiator</code>. For internal use only.
+   * sets the context of this {@code Instantiator}. For internal use only.
    */
   public void setContext(Object/* ClientProxyMembershipID */ context) {
     this.context = (ClientProxyMembershipID) context;
   }
 
   /**
-   * Returns the context of this <code>Instantiator</code>. For internal use only.
+   * Returns the context of this {@code Instantiator}. For internal use only.
    */
   public Object/* ClientProxyMembershipID */ getContext() {
     return this.context;

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
index a0cb7d4..7af4f4f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
@@ -228,7 +228,7 @@ public abstract class AbstractOp implements Op {
   protected abstract Object processResponse(Message msg) throws Exception;
 
   /**
-   * Return true of <code>msgType</code> indicates the operation had an error on the server.
+   * Return true of <code>messageType</code> indicates the operation had an error on the server.
    */
   protected abstract boolean isErrorResponse(int msgType);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
index cc30f1c..2e52542 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.cache.client.internal;
 
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -25,6 +24,7 @@ import org.apache.geode.internal.cache.tier.sockets.Message;
  * @since GemFire 5.7
  */
 public class PingOp {
+
   /**
    * Ping the specified server to see if it is still alive
    * 
@@ -47,13 +47,13 @@ public class PingOp {
     /**
      * @throws org.apache.geode.SerializationException if serialization fails
      */
-    public PingOpImpl() {
+    PingOpImpl() {
       super(MessageType.PING, 0);
     }
 
     @Override
     protected void processSecureBytes(Connection cnx, Message message) throws Exception {
-      Message.messageType.set(null);
+      Message.MESSAGE_TYPE.set(null);
     }
 
     @Override
@@ -64,9 +64,9 @@ public class PingOp {
     @Override
     protected void sendMessage(Connection cnx) throws Exception {
       getMessage().clearMessageHasSecurePartFlag();
-      startTime = System.currentTimeMillis();
+      this.startTime = System.currentTimeMillis();
       getMessage().send(false);
-      Message.messageType.set(MessageType.PING);
+      Message.MESSAGE_TYPE.set(MessageType.PING);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index 291db65..7698550 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -572,20 +572,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * the server.
    */
   private Message initializeMessage() {
-    Message _message = new Message(2, Version.CURRENT);
-    try {
-      _message.setComms(socket, in, out, commBuffer, this.stats);
-    } catch (IOException e) {
-      if (!quitting()) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Caught following exception while attempting to initialize a server-to-client communication socket and will exit",
-              this, e);
-        }
-        stopProcessing();
-      }
-    }
-    return _message;
+    Message message = new Message(2, Version.CURRENT);
+    message.setComms(this.socket, this.in, this.out, this.commBuffer, this.stats);
+    return message;
   }
 
   /* refinement of method inherited from Thread */

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
index 2a5a3d7..be30061 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
@@ -22,7 +22,6 @@ import org.apache.geode.internal.logging.LogService;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.apache.logging.log4j.Logger;
@@ -36,7 +35,7 @@ import org.apache.logging.log4j.Logger;
  * 
  * <PRE>
  * 
- * msgType - int - 4 bytes type of message, types enumerated below
+ * messageType - int - 4 bytes type of message, types enumerated below
  * 
  * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained
  * in the payload. Message can be a multi-part message
@@ -153,7 +152,7 @@ public class ChunkedMessage extends Message {
 
   public void setLastChunkAndNumParts(boolean lastChunk, int numParts) {
     setLastChunk(lastChunk);
-    if (this.sc != null && this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) {
+    if (this.serverConnection != null && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
       // we us e three bits for number of parts in last chunk byte
       // we us e three bits for number of parts in last chunk byte
       byte localLastChunk = (byte) (numParts << 5);
@@ -162,7 +161,7 @@ public class ChunkedMessage extends Message {
   }
 
   public void setServerConnection(ServerConnection servConn) {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
   }
 
@@ -209,7 +208,7 @@ public class ChunkedMessage extends Message {
         // Set the header and payload fields only after receiving all the
         // socket data, providing better message consistency in the face
         // of exceptional conditions (e.g. IO problems, timeouts etc.)
-        this.msgType = type;
+        this.messageType = type;
         this.numberOfParts = numParts; // Already set in setPayloadFields via setNumberOfParts
         this.transactionId = txid;
       }
@@ -241,14 +240,14 @@ public class ChunkedMessage extends Message {
     int totalBytesRead = 0;
     do {
       int bytesRead = 0;
-      bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
+      bytesRead = inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
       if (bytesRead == -1) {
         throw new EOFException(
             LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString());
       }
       totalBytesRead += bytesRead;
-      if (this.msgStats != null) {
-        this.msgStats.incReceivedBytes(bytesRead);
+      if (this.messageStats != null) {
+        this.messageStats.incReceivedBytes(bytesRead);
       }
     } while (totalBytesRead < CHUNK_HEADER_LENGTH);
 
@@ -315,7 +314,7 @@ public class ChunkedMessage extends Message {
    * Sends a chunk of this message.
    */
   public void sendChunk(ServerConnection servConn) throws IOException {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
     sendChunk();
   }
@@ -355,7 +354,7 @@ public class ChunkedMessage extends Message {
   protected void getHeaderBytesForWrite() {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    cb.putInt(this.msgType);
+    cb.putInt(this.messageType);
     cb.putInt(this.numberOfParts);
 
     cb.putInt(this.transactionId);

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index f102b2d..354ad0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.apache.geode.internal.util.IOUtils.close;
+
 import org.apache.geode.SerializationException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
@@ -34,7 +36,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
@@ -47,7 +48,7 @@ import java.util.concurrent.TimeUnit;
  * and serialize it out to the wire.
  *
  * <PRE>
- * msgType       - int   - 4 bytes type of message, types enumerated below
+ * messageType       - int   - 4 bytes type of message, types enumerated below
  *
  * msgLength     - int - 4 bytes   total length of variable length payload
  *
@@ -55,10 +56,10 @@ import java.util.concurrent.TimeUnit;
  *                     contained in the payload. Message can
  *                       be a multi-part message
  *
- * transId       - int - 4 bytes  filled in by the requestor, copied back into
+ * transId       - int - 4 bytes  filled in by the requester, copied back into
  *                    the response
  *
- * flags         - byte- 1 byte   filled in by the requestor
+ * flags         - byte- 1 byte   filled in by the requester
  * len1
  * part1
  * .
@@ -76,18 +77,16 @@ import java.util.concurrent.TimeUnit;
  *
  * See also <a href="package-summary.html#messages">package description</a>.
  *
- * @see org.apache.geode.internal.cache.tier.MessageType
- *
+ * @see MessageType
  */
 public class Message {
 
-  public static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
-  /**
-   * maximum size of an outgoing message. See GEODE-478
-   */
-  public static int MAX_MESSAGE_SIZE =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size",
-          DEFAULT_MAX_MESSAGE_SIZE).intValue();
+  // Tentative workaround to avoid OOM stated in #46754.
+  public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>();
+
+  public static final String MAX_MESSAGE_SIZE_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size";
+
+  static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
 
   private static final Logger logger = LogService.getLogger();
 
@@ -97,83 +96,95 @@ public class Message {
 
   private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
 
-  private static final byte[] TRUE;
-  private static final byte[] FALSE;
+  // These two statics are fields shoved into the flags byte for transmission.
+  // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
+  // is left in place
+  private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
+  private static final byte MESSAGE_IS_RETRY = (byte) 0x04;
 
-  static {
+  private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
+
+  private static final int DEFAULT_CHUNK_SIZE = 1024;
+
+  private static final byte[] TRUE = defineTrue();
+  private static final byte[] FALSE = defineFalse();
+
+  private static byte[] defineTrue() {
+    HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
     try {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
       BlobHelper.serializeTo(Boolean.TRUE, hdos);
-      TRUE = hdos.toByteArray();
-    } catch (Exception e) {
+      return hdos.toByteArray();
+    } catch (IOException e) {
       throw new IllegalStateException(e);
+    } finally {
+      close(hdos);
     }
+  }
 
+  private static byte[] defineFalse() {
+    HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
     try {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
       BlobHelper.serializeTo(Boolean.FALSE, hdos);
-      FALSE = hdos.toByteArray();
-    } catch (Exception e) {
+      return hdos.toByteArray();
+    } catch (IOException e) {
       throw new IllegalStateException(e);
+    } finally {
+      close(hdos);
     }
   }
 
-  protected int msgType;
-  protected int payloadLength = 0;
-  protected int numberOfParts = 0;
+  /**
+   * maximum size of an outgoing message. See GEODE-478
+   */
+  private final int maxMessageSize;
+
+  protected int messageType;
+  private int payloadLength = 0;
+  int numberOfParts = 0;
   protected int transactionId = TXManagerImpl.NOTX;
-  protected int currentPart = 0;
-  protected Part[] partsList = null;
-  protected ByteBuffer cachedCommBuffer;
+  int currentPart = 0;
+  private Part[] partsList = null;
+  private ByteBuffer cachedCommBuffer;
   protected Socket socket = null;
-  protected SocketChannel sockCh = null;
-  protected OutputStream os = null;
-  protected InputStream is = null;
-  protected boolean messageModified = true;
+  private SocketChannel socketChannel = null;
+  private OutputStream outputStream = null;
+  protected InputStream inputStream = null;
+  private boolean messageModified = true;
+
   /** is this message a retry of a previously sent message? */
-  protected boolean isRetry;
+  private boolean isRetry;
+
   private byte flags = 0x00;
-  protected MessageStats msgStats = null;
-  protected ServerConnection sc = null;
+  MessageStats messageStats = null;
+  protected ServerConnection serverConnection = null;
   private int maxIncomingMessageLength = -1;
   private Semaphore dataLimiter = null;
-  // private int MAX_MSGS = -1;
-  private Semaphore msgLimiter = null;
-  private boolean hdrRead = false;
-  private int chunkSize = 1024;// Default Chunk Size.
+  private Semaphore messageLimiter = null;
+  private boolean readHeader = false;
+  private int chunkSize = DEFAULT_CHUNK_SIZE;
 
-  protected Part securePart = null;
+  Part securePart = null;
   private boolean isMetaRegion = false;
 
-
-  // These two statics are fields shoved into the flags byte for transmission.
-  // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
-  // is left in place
-  public static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
-  public static final byte MESSAGE_IS_RETRY = (byte) 0x04;
-
-  public static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
-
-  // Tentative workaround to avoid OOM stated in #46754.
-  public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>();
-
-  Version version;
+  private Version version;
 
   /**
    * Creates a new message with the given number of parts
    */
   public Message(int numberOfParts, Version destVersion) {
+    this.maxMessageSize = Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, DEFAULT_MAX_MESSAGE_SIZE);
     this.version = destVersion;
     Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message");
-    partsList = new Part[numberOfParts];
+    this.partsList = new Part[numberOfParts];
     this.numberOfParts = numberOfParts;
-    for (int i = 0; i < partsList.length; i++) {
-      partsList[i] = new Part();
+    int partsListLength = this.partsList.length;
+    for (int i = 0; i < partsListLength; i++) {
+      this.partsList[i] = new Part();
     }
   }
 
   public boolean isSecureMode() {
-    return securePart != null;
+    return this.securePart != null;
   }
 
   public byte[] getSecureBytes() throws IOException, ClassNotFoundException {
@@ -186,7 +197,7 @@ public class Message {
       throw new IllegalArgumentException(
           LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
     }
-    this.msgType = msgType;
+    this.messageType = msgType;
   }
 
   public void setVersion(Version clientVersion) {
@@ -194,17 +205,15 @@ public class Message {
   }
 
   public void setMessageHasSecurePartFlag() {
-    this.flags = (byte) (this.flags | MESSAGE_HAS_SECURE_PART);
+    this.flags |= MESSAGE_HAS_SECURE_PART;
   }
 
   public void clearMessageHasSecurePartFlag() {
-    this.flags = (byte) (this.flags & MESSAGE_HAS_SECURE_PART);
+    this.flags &= MESSAGE_HAS_SECURE_PART;
   }
 
   /**
    * Sets and builds the {@link Part}s that are sent in the payload of the Message
-   * 
-   * @param numberOfParts
    */
   public void setNumberOfParts(int numberOfParts) {
     // hitesh: need to add security header here from server
@@ -227,9 +236,7 @@ public class Message {
   }
 
   /**
-   * For boundary testing we may need to inject mock parts
-   * 
-   * @param parts
+   * For boundary testing we may need to inject mock parts. For testing only.
    */
   void setParts(Part[] parts) {
     this.partsList = parts;
@@ -260,7 +267,7 @@ public class Message {
   /**
    * When building a Message this will return the number of the next Part to be added to the message
    */
-  public int getNextPartNumber() {
+  int getNextPartNumber() {
     return this.currentPart;
   }
 
@@ -268,32 +275,41 @@ public class Message {
     addStringPart(str, false);
   }
 
-  private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<String, byte[]>();
+  private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<>();
 
   public void addStringPart(String str, boolean enableCaching) {
     if (str == null) {
-      addRawPart((byte[]) null, false);
-    } else {
-      Part part = partsList[this.currentPart];
-      if (enableCaching) {
-        byte[] bytes = CACHED_STRINGS.get(str);
-        if (bytes == null) {
-          HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+      addRawPart(null, false);
+      return;
+    }
+
+    Part part = this.partsList[this.currentPart];
+    if (enableCaching) {
+      byte[] bytes = CACHED_STRINGS.get(str);
+      if (bytes == null) {
+        HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+        try {
           bytes = hdos.toByteArray();
           CACHED_STRINGS.put(str, bytes);
+        } finally {
+          close(hdos);
         }
-        part.setPartState(bytes, false);
-      } else {
-        HeapDataOutputStream hdos = new HeapDataOutputStream(str);
-        this.messageModified = true;
-        part.setPartState(hdos, false);
       }
-      this.currentPart++;
+      part.setPartState(bytes, false);
+    } else {
+      HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+      try {
+      this.messageModified = true;
+      part.setPartState(hdos, false);
+      } finally {
+        close(hdos);
+      }
     }
+    this.currentPart++;
   }
 
   /*
-   * Adds a new part to this message that contains a <code>byte</code> array (as opposed to a
+   * Adds a new part to this message that contains a {@code byte} array (as opposed to a
    * serialized object).
    *
    * @see #addPart(byte[], boolean)
@@ -312,13 +328,6 @@ public class Message {
     }
   }
 
-  public void addDeltaPart(HeapDataOutputStream hdos) {
-    this.messageModified = true;
-    Part part = partsList[this.currentPart];
-    part.setPartState(hdos, false);
-    this.currentPart++;
-  }
-
   public void addObjPart(Object o) {
     addObjPart(o, false);
   }
@@ -345,6 +354,9 @@ public class Message {
     }
   }
 
+  /**
+   * Object o is always null
+   */
   public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
     if (o == null) {
       addRawPart((byte[]) o, false);
@@ -353,7 +365,7 @@ public class Message {
     } else if (o instanceof StoredObject) {
       // It is possible it is an off-heap StoredObject that contains a simple non-object byte[].
       this.messageModified = true;
-      Part part = partsList[this.currentPart];
+      Part part = this.partsList[this.currentPart];
       part.setPartState((StoredObject) o, isObject);
       this.currentPart++;
     } else {
@@ -362,59 +374,61 @@ public class Message {
   }
 
   private void serializeAndAddPartNoCopying(Object o) {
-    HeapDataOutputStream hdos;
-    Version v = version;
-    if (version.equals(Version.CURRENT)) {
+    Version v = this.version;
+    if (this.version.equals(Version.CURRENT)) {
       v = null;
     }
+    
     // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources
     // passed to it.
-    hdos = new HeapDataOutputStream(chunkSize, v, true);
+    HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true);
     try {
       BlobHelper.serializeTo(o, hdos);
+      this.messageModified = true;
+      Part part = this.partsList[this.currentPart];
+      part.setPartState(hdos, true);
+      this.currentPart++;
     } catch (IOException ex) {
       throw new SerializationException("failed serializing object", ex);
+    } finally {
+      close(hdos);
     }
-    this.messageModified = true;
-    Part part = partsList[this.currentPart];
-    part.setPartState(hdos, true);
-    this.currentPart++;
-
   }
 
   private void serializeAndAddPart(Object o, boolean zipValues) {
     if (zipValues) {
       throw new UnsupportedOperationException("zipValues no longer supported");
-
-    } else {
-      HeapDataOutputStream hdos;
-      Version v = version;
-      if (version.equals(Version.CURRENT)) {
-        v = null;
-      }
-      hdos = new HeapDataOutputStream(chunkSize, v);
-      try {
-        BlobHelper.serializeTo(o, hdos);
-      } catch (IOException ex) {
-        throw new SerializationException("failed serializing object", ex);
-      }
+    }
+    
+    Version v = this.version;
+    if (this.version.equals(Version.CURRENT)) {
+      v = null;
+    }
+    
+    HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v);
+    try {
+      BlobHelper.serializeTo(o, hdos);
       this.messageModified = true;
-      Part part = partsList[this.currentPart];
+      Part part = this.partsList[this.currentPart];
       part.setPartState(hdos, true);
       this.currentPart++;
+    } catch (IOException ex) {
+      throw new SerializationException("failed serializing object", ex);
+    } finally {
+      close(hdos);
     }
   }
 
   public void addIntPart(int v) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setInt(v);
     this.currentPart++;
   }
 
   public void addLongPart(long v) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setLong(v);
     this.currentPart++;
   }
@@ -424,13 +438,13 @@ public class Message {
    */
   public void addRawPart(byte[] newPart, boolean isObject) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setPartState(newPart, isObject);
     this.currentPart++;
   }
 
   public int getMessageType() {
-    return this.msgType;
+    return this.messageType;
   }
 
   public int getPayloadLength() {
@@ -451,7 +465,7 @@ public class Message {
 
   public Part getPart(int index) {
     if (index < this.numberOfParts) {
-      Part p = partsList[index];
+      Part p = this.partsList[index];
       if (this.version != null) {
         p.setVersion(this.version);
       }
@@ -480,9 +494,9 @@ public class Message {
     if (len != 0) {
       this.payloadLength = 0;
     }
-    if (this.hdrRead) {
-      if (this.msgStats != null) {
-        this.msgStats.decMessagesBeingReceived(len);
+    if (this.readHeader) {
+      if (this.messageStats != null) {
+        this.messageStats.decMessagesBeingReceived(len);
       }
     }
     ByteBuffer buffer = getCommBuffer();
@@ -495,20 +509,18 @@ public class Message {
       this.dataLimiter = null;
       this.maxIncomingMessageLength = 0;
     }
-    if (this.hdrRead) {
-      if (this.msgLimiter != null) {
-        this.msgLimiter.release(1);
-        this.msgLimiter = null;
+    if (this.readHeader) {
+      if (this.messageLimiter != null) {
+        this.messageLimiter.release(1);
+        this.messageLimiter = null;
       }
-      this.hdrRead = false;
+      this.readHeader = false;
     }
     this.flags = 0;
   }
 
   protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
-    // hitesh: setting second bit of flags byte for client
-    // this is not require but this makes all changes easily at client side right now
-    // just see this bit and process security header
+    // setting second bit of flags byte for client this is not require but this makes all changes easily at client side right now just see this bit and process security header
     byte flagsByte = this.flags;
     if (isSecurityHeader) {
       flagsByte |= MESSAGE_HAS_SECURE_PART;
@@ -516,14 +528,14 @@ public class Message {
     if (this.isRetry) {
       flagsByte |= MESSAGE_IS_RETRY;
     }
-    getCommBuffer().putInt(this.msgType).putInt(msgLen).putInt(this.numberOfParts)
-        .putInt(this.transactionId).put(flagsByte);
+    getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts)
+                   .putInt(this.transactionId).put(flagsByte);
   }
 
   protected Part getSecurityPart() {
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // look types right put get etc
-      return this.sc.updateAndGetSecurityPart();
+      return this.serverConnection.updateAndGetSecurityPart();
     }
     return null;
   }
@@ -537,7 +549,7 @@ public class Message {
     this.isMetaRegion = isMetaRegion;
   }
 
-  public boolean getAndResetIsMetaRegion() {
+  boolean getAndResetIsMetaRegion() {
     boolean isMetaRegion = this.isMetaRegion;
     this.isMetaRegion = false;
     return isMetaRegion;
@@ -546,21 +558,20 @@ public class Message {
   /**
    * Sends this message out on its socket.
    */
-  protected void sendBytes(boolean clearMessage) throws IOException {
-    if (this.sc != null) {
+  void sendBytes(boolean clearMessage) throws IOException {
+    if (this.serverConnection != null) {
       // Keep track of the fact that we are making progress.
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
     if (this.socket == null) {
       throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
     }
     try {
-      final ByteBuffer cb = getCommBuffer();
-      if (cb == null) {
+      final ByteBuffer commBuffer = getCommBuffer();
+      if (commBuffer == null) {
         throw new IOException("No buffer");
       }
-      int msgLen = 0;
-      synchronized (cb) {
+      synchronized (commBuffer) {
         long totalPartLen = 0;
         long headerLen = 0;
         int partsToTransmit = this.numberOfParts;
@@ -581,50 +592,50 @@ public class Message {
           partsToTransmit++;
         }
 
-        if ((headerLen + totalPartLen) > Integer.MAX_VALUE) {
+        if (headerLen + totalPartLen > Integer.MAX_VALUE) {
           throw new MessageTooLargeException(
               "Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value");
         }
 
-        msgLen = (int) (headerLen + totalPartLen);
+        int msgLen = (int) (headerLen + totalPartLen);
 
-        if (msgLen > MAX_MESSAGE_SIZE) {
+        if (msgLen > this.maxMessageSize) {
           throw new MessageTooLargeException("Message size (" + msgLen
-              + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
+                                             + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")");
         }
 
-        cb.clear();
-        packHeaderInfoForSending(msgLen, (securityPart != null));
+        commBuffer.clear();
+        packHeaderInfoForSending(msgLen, securityPart != null);
         for (int i = 0; i < partsToTransmit; i++) {
-          Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
+          Part part = i == this.numberOfParts ? securityPart : this.partsList[i];
 
-          if (cb.remaining() < PART_HEADER_SIZE) {
+          if (commBuffer.remaining() < PART_HEADER_SIZE) {
             flushBuffer();
           }
 
           int partLen = part.getLength();
-          cb.putInt(partLen);
-          cb.put(part.getTypeCode());
-          if (partLen <= cb.remaining()) {
-            part.writeTo(cb);
+          commBuffer.putInt(partLen);
+          commBuffer.put(part.getTypeCode());
+          if (partLen <= commBuffer.remaining()) {
+            part.writeTo(commBuffer);
           } else {
             flushBuffer();
-            if (this.sockCh != null) {
-              part.writeTo(this.sockCh, cb);
+            if (this.socketChannel != null) {
+              part.writeTo(this.socketChannel, commBuffer);
             } else {
-              part.writeTo(this.os, cb);
+              part.writeTo(this.outputStream, commBuffer);
             }
-            if (this.msgStats != null) {
-              this.msgStats.incSentBytes(partLen);
+            if (this.messageStats != null) {
+              this.messageStats.incSentBytes(partLen);
             }
           }
         }
-        if (cb.position() != 0) {
+        if (commBuffer.position() != 0) {
           flushBuffer();
         }
         this.messageModified = false;
-        if (this.sockCh == null) {
-          this.os.flush();
+        if (this.socketChannel == null) {
+          this.outputStream.flush();
         }
       }
     } finally {
@@ -634,69 +645,67 @@ public class Message {
     }
   }
 
-  protected void flushBuffer() throws IOException {
+  void flushBuffer() throws IOException {
     final ByteBuffer cb = getCommBuffer();
-    if (this.sockCh != null) {
+    if (this.socketChannel != null) {
       cb.flip();
       do {
-        this.sockCh.write(cb);
+        this.socketChannel.write(cb);
       } while (cb.remaining() > 0);
     } else {
-      this.os.write(cb.array(), 0, cb.position());
+      this.outputStream.write(cb.array(), 0, cb.position());
     }
-    if (this.msgStats != null) {
-      this.msgStats.incSentBytes(cb.position());
+    if (this.messageStats != null) {
+      this.messageStats.incSentBytes(cb.position());
     }
     cb.clear();
   }
 
   private void read() throws IOException {
     clearParts();
-    // TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client
-    // :(
+    // TODO: for server changes make sure sc is not null as this class also used by client
     readHeaderAndPayload();
   }
 
   /**
    * Read the actual bytes of the header off the socket
    */
-  protected void fetchHeader() throws IOException {
+  void fetchHeader() throws IOException {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    // msgType is invalidated here and can be used as an indicator
+    
+    // messageType is invalidated here and can be used as an indicator
     // of problems reading the message
-    this.msgType = MessageType.INVALID;
-
-    int hdr = 0;
+    this.messageType = MessageType.INVALID;
 
     final int headerLength = getHeaderLength();
-    if (this.sockCh != null) {
+    if (this.socketChannel != null) {
       cb.limit(headerLength);
       do {
-        int bytesRead = this.sockCh.read(cb);
-        // System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb);
+        int bytesRead = this.socketChannel.read(cb);
         if (bytesRead == -1) {
           throw new EOFException(
               LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
                   .toLocalizedString());
         }
-        if (this.msgStats != null) {
-          this.msgStats.incReceivedBytes(bytesRead);
+        if (this.messageStats != null) {
+          this.messageStats.incReceivedBytes(bytesRead);
         }
       } while (cb.remaining() > 0);
       cb.flip();
+      
     } else {
+      int hdr = 0;
       do {
-        int bytesRead = -1;
-        bytesRead = this.is.read(cb.array(), hdr, headerLength - hdr);
+        int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - hdr);
         if (bytesRead == -1) {
           throw new EOFException(
               LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
                   .toLocalizedString());
         }
         hdr += bytesRead;
-        if (this.msgStats != null) {
-          this.msgStats.incReceivedBytes(bytesRead);
+        if (this.messageStats != null) {
+          this.messageStats.incReceivedBytes(bytesRead);
         }
       } while (hdr < headerLength);
 
@@ -717,34 +726,36 @@ public class Message {
 
     if (!MessageType.validate(type)) {
       throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER
-          .toLocalizedString(Integer.valueOf(type)));
+          .toLocalizedString(type));
     }
+    
     int timeToWait = 0;
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // Keep track of the fact that a message is being processed.
-      this.sc.setProcessingMessage();
-      timeToWait = sc.getClientReadTimeout();
+      this.serverConnection.setProcessingMessage();
+      timeToWait = this.serverConnection.getClientReadTimeout();
     }
-    this.hdrRead = true;
-    if (this.msgLimiter != null) {
+    this.readHeader = true;
+    
+    if (this.messageLimiter != null) {
       for (;;) {
-        this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+        this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
         boolean interrupted = Thread.interrupted();
         try {
           if (timeToWait == 0) {
-            this.msgLimiter.acquire(1);
+            this.messageLimiter.acquire(1);
           } else {
-            if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
-              if (this.msgStats != null && this.msgStats instanceof CacheServerStats) {
-                ((CacheServerStats) this.msgStats).incConnectionsTimedOut();
+            if (!this.messageLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
+              if (this.messageStats instanceof CacheServerStats) {
+                ((CacheServerStats) this.messageStats).incConnectionsTimedOut();
               }
               throw new IOException(
                   LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS
-                      .toLocalizedString(Integer.valueOf(timeToWait)));
+                      .toLocalizedString(timeToWait));
             }
           }
           break;
-        } catch (InterruptedException e) {
+        } catch (InterruptedException ignore) {
           interrupted = true;
         } finally {
           if (interrupted) {
@@ -753,16 +764,19 @@ public class Message {
         }
       } // for
     }
+    
     if (len > 0) {
       if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
         throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1
-            .toLocalizedString(new Object[] {Integer.valueOf(len),
-                Integer.valueOf(this.maxIncomingMessageLength)}));
+            .toLocalizedString(new Object[] {
+              len, this.maxIncomingMessageLength
+            }));
       }
+      
       if (this.dataLimiter != null) {
         for (;;) {
-          if (sc != null) {
-            this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+          if (this.serverConnection != null) {
+            this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
           }
           boolean interrupted = Thread.interrupted();
           try {
@@ -770,21 +784,21 @@ public class Message {
               this.dataLimiter.acquire(len);
             } else {
               int newTimeToWait = timeToWait;
-              if (this.msgLimiter != null) {
+              if (this.messageLimiter != null) {
                 // may have waited for msg limit so recalc time to wait
-                newTimeToWait -= (int) sc.getCurrentMessageProcessingTime();
+                newTimeToWait -= (int) this.serverConnection.getCurrentMessageProcessingTime();
               }
               if (newTimeToWait <= 0
-                  || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
+                  || !this.messageLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
                 throw new IOException(
                     LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS
                         .toLocalizedString(timeToWait));
               }
             }
-            this.payloadLength = len; // makes sure payloadLength gets set now so we will release
-                                      // the semaphore
+            // makes sure payloadLength gets set now so we will release the semaphore
+            this.payloadLength = len;
             break; // success
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ignore) {
             interrupted = true;
           } finally {
             if (interrupted) {
@@ -794,15 +808,15 @@ public class Message {
         }
       }
     }
-    if (this.msgStats != null) {
-      this.msgStats.incMessagesBeingReceived(len);
+    if (this.messageStats != null) {
+      this.messageStats.incMessagesBeingReceived(len);
       this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
     }
 
     this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
-    bits = (byte) (bits & MESSAGE_IS_RETRY_MASK);
+    bits &= MESSAGE_IS_RETRY_MASK;
     this.flags = bits;
-    this.msgType = type;
+    this.messageType = type;
 
     readPayloadFields(numParts, len);
 
@@ -813,32 +827,38 @@ public class Message {
     // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts
     this.transactionId = txid;
     this.flags = bits;
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // Keep track of the fact that a message is being processed.
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
   }
 
-  protected void readPayloadFields(final int numParts, final int len) throws IOException {
+  /**
+   * TODO: refactor overly long method readPayloadFields
+   */
+  void readPayloadFields(final int numParts, final int len) throws IOException {
     if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) {
       throw new IOException(
           LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT
-              .toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)}));
+              .toLocalizedString(new Object[] { len, numParts }));
     }
 
-    Integer msgType = messageType.get();
+    Integer msgType = MESSAGE_TYPE.get();
     if (msgType != null && msgType == MessageType.PING) {
-      messageType.set(null); // set it to null right away.
-      int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping
-                          // operation.
+      // set it to null right away.
+      MESSAGE_TYPE.set(null);
+      // Some number which will not throw OOM but still be acceptable for a ping operation.
+      int pingParts = 10;
       if (numParts > pingParts) {
         throw new IOException("Part length ( " + numParts + " ) is  inconsistent for "
             + MessageType.getString(msgType) + " operation.");
       }
     }
+    
     setNumberOfParts(numParts);
-    if (numParts <= 0)
+    if (numParts <= 0) {
       return;
+    }
 
     if (len < 0) {
       logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len));
@@ -849,12 +869,10 @@ public class Message {
     cb.clear();
     cb.flip();
 
-    int readSecurePart = 0;
-    readSecurePart = checkAndSetSecurityPart();
+    int readSecurePart = checkAndSetSecurityPart();
 
     int bytesRemaining = len;
-    for (int i = 0; ((i < numParts + readSecurePart)
-        || ((readSecurePart == 1) && (cb.remaining() > 0))); i++) {
+    for (int i = 0; i < numParts + readSecurePart || readSecurePart == 1 && cb.remaining() > 0; i++) {
       int bytesReadThisTime = readPartChunk(bytesRemaining);
       bytesRemaining -= bytesReadThisTime;
 
@@ -869,6 +887,7 @@ public class Message {
       int partLen = cb.getInt();
       byte partType = cb.get();
       byte[] partBytes = null;
+      
       if (partLen > 0) {
         partBytes = new byte[partLen];
         int alreadyReadBytes = cb.remaining();
@@ -878,26 +897,27 @@ public class Message {
           }
           cb.get(partBytes, 0, alreadyReadBytes);
         }
+        
         // now we need to read partLen - alreadyReadBytes off the wire
         int off = alreadyReadBytes;
         int remaining = partLen - off;
         while (remaining > 0) {
-          if (this.sockCh != null) {
+          if (this.socketChannel != null) {
             int bytesThisTime = remaining;
             cb.clear();
             if (bytesThisTime > cb.capacity()) {
               bytesThisTime = cb.capacity();
             }
             cb.limit(bytesThisTime);
-            int res = this.sockCh.read(cb);
+            int res = this.socketChannel.read(cb);
             if (res != -1) {
               cb.flip();
               bytesRemaining -= res;
               remaining -= res;
               cb.get(partBytes, off, res);
               off += res;
-              if (this.msgStats != null) {
-                this.msgStats.incReceivedBytes(res);
+              if (this.messageStats != null) {
+                this.messageStats.incReceivedBytes(res);
               }
             } else {
               throw new EOFException(
@@ -905,14 +925,13 @@ public class Message {
                       .toLocalizedString());
             }
           } else {
-            int res = 0;
-            res = this.is.read(partBytes, off, remaining);
+            int res = this.inputStream.read(partBytes, off, remaining);
             if (res != -1) {
               bytesRemaining -= res;
               remaining -= res;
               off += res;
-              if (this.msgStats != null) {
-                this.msgStats.incReceivedBytes(res);
+              if (this.messageStats != null) {
+                this.messageStats.incReceivedBytes(res);
               }
             } else {
               throw new EOFException(
@@ -941,35 +960,38 @@ public class Message {
    * @return the number of bytes read into commBuffer
    */
   private int readPartChunk(int bytesRemaining) throws IOException {
-    final ByteBuffer cb = getCommBuffer();
-    if (cb.remaining() >= PART_HEADER_SIZE) {
+    final ByteBuffer commBuffer = getCommBuffer();
+    if (commBuffer.remaining() >= PART_HEADER_SIZE) {
       // we already have the next part header in commBuffer so just return
       return 0;
     }
-    if (cb.position() != 0) {
-      cb.compact();
+    
+    if (commBuffer.position() != 0) {
+      commBuffer.compact();
     } else {
-      cb.position(cb.limit());
-      cb.limit(cb.capacity());
+      commBuffer.position(commBuffer.limit());
+      commBuffer.limit(commBuffer.capacity());
     }
-    int bytesRead = 0;
-    if (this.sc != null) {
+    
+    if (this.serverConnection != null) {
       // Keep track of the fact that we are making progress
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
-    if (this.sockCh != null) {
-      int remaining = cb.remaining();
+    int bytesRead = 0;
+    
+    if (this.socketChannel != null) {
+      int remaining = commBuffer.remaining();
       if (remaining > bytesRemaining) {
         remaining = bytesRemaining;
-        cb.limit(cb.position() + bytesRemaining);
+        commBuffer.limit(commBuffer.position() + bytesRemaining);
       }
       while (remaining > 0) {
-        int res = this.sockCh.read(cb);
+        int res = this.socketChannel.read(commBuffer);
         if (res != -1) {
           remaining -= res;
           bytesRead += res;
-          if (this.msgStats != null) {
-            this.msgStats.incReceivedBytes(res);
+          if (this.messageStats != null) {
+            this.messageStats.incReceivedBytes(res);
           }
         } else {
           throw new EOFException(
@@ -979,21 +1001,20 @@ public class Message {
       }
 
     } else {
-      int bufSpace = cb.capacity() - cb.position();
-      int bytesToRead = bufSpace;
+      int bytesToRead = commBuffer.capacity() - commBuffer.position();
       if (bytesRemaining < bytesToRead) {
         bytesToRead = bytesRemaining;
       }
-      int pos = cb.position();
+      int pos = commBuffer.position();
+      
       while (bytesToRead > 0) {
-        int res = 0;
-        res = this.is.read(cb.array(), pos, bytesToRead);
+        int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead);
         if (res != -1) {
           bytesToRead -= res;
           pos += res;
           bytesRead += res;
-          if (this.msgStats != null) {
-            this.msgStats.incReceivedBytes(res);
+          if (this.messageStats != null) {
+            this.messageStats.incReceivedBytes(res);
           }
         } else {
           throw new EOFException(
@@ -1001,9 +1022,10 @@ public class Message {
                   .toLocalizedString());
         }
       }
-      cb.position(pos);
+      
+      commBuffer.position(pos);
     }
-    cb.flip();
+    commBuffer.flip();
     return bytesRead;
   }
 
@@ -1011,40 +1033,39 @@ public class Message {
    * Gets rid of all the parts that have been added to this message.
    */
   public void clearParts() {
-    for (int i = 0; i < partsList.length; i++) {
-      partsList[i].clear();
+    for (Part part : this.partsList) {
+      part.clear();
     }
     this.currentPart = 0;
   }
 
   @Override
   public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("type=").append(MessageType.getString(msgType));
-    sb.append("; payloadLength=").append(payloadLength);
-    sb.append("; numberOfParts=").append(numberOfParts);
-    sb.append("; transactionId=").append(transactionId);
-    sb.append("; currentPart=").append(currentPart);
-    sb.append("; messageModified=").append(messageModified);
-    sb.append("; flags=").append(Integer.toHexString(flags));
-    for (int i = 0; i < numberOfParts; i++) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("type=").append(MessageType.getString(this.messageType));
+    sb.append("; payloadLength=").append(this.payloadLength);
+    sb.append("; numberOfParts=").append(this.numberOfParts);
+    sb.append("; transactionId=").append(this.transactionId);
+    sb.append("; currentPart=").append(this.currentPart);
+    sb.append("; messageModified=").append(this.messageModified);
+    sb.append("; flags=").append(Integer.toHexString(this.flags));
+    for (int i = 0; i < this.numberOfParts; i++) {
       sb.append("; part[").append(i).append("]={");
-      sb.append(this.partsList[i].toString());
+      sb.append(this.partsList[i]);
       sb.append("}");
     }
     return sb.toString();
   }
 
-
-  public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats)
+  void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats)
       throws IOException {
-    this.sc = sc;
+    this.serverConnection = sc;
     setComms(socket, bb, msgStats);
   }
 
-  public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
-    this.sockCh = socket.getChannel();
-    if (this.sockCh == null) {
+  void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+    this.socketChannel = socket.getChannel();
+    if (this.socketChannel == null) {
       setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats);
     } else {
       setComms(socket, null, null, bb, msgStats);
@@ -1052,14 +1073,14 @@ public class Message {
   }
 
   public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb,
-      MessageStats msgStats) throws IOException {
+      MessageStats msgStats) {
     Assert.assertTrue(socket != null);
     this.socket = socket;
-    this.sockCh = socket.getChannel();
-    this.is = is;
-    this.os = os;
+    this.socketChannel = socket.getChannel();
+    this.inputStream = is;
+    this.outputStream = os;
     this.cachedCommBuffer = bb;
-    this.msgStats = msgStats;
+    this.messageStats = msgStats;
   }
 
   /**
@@ -1069,11 +1090,11 @@ public class Message {
    */
   public void unsetComms() {
     this.socket = null;
-    this.sockCh = null;
-    this.is = null;
-    this.os = null;
+    this.socketChannel = null;
+    this.inputStream = null;
+    this.outputStream = null;
     this.cachedCommBuffer = null;
-    this.msgStats = null;
+    this.messageStats = null;
   }
 
   /**
@@ -1084,7 +1105,7 @@ public class Message {
   }
 
   public void send(ServerConnection servConn) throws IOException {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
     send(true);
   }
@@ -1097,7 +1118,7 @@ public class Message {
   }
 
   /**
-   * Populates the stats of this <code>Message</code> with information received via its socket
+   * Populates the stats of this {@code Message} with information received via its socket
    */
   public void recv() throws IOException {
     if (this.socket != null) {
@@ -1111,10 +1132,10 @@ public class Message {
 
   public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter,
       Semaphore msgLimiter) throws IOException {
-    this.sc = sc;
+    this.serverConnection = sc;
     this.maxIncomingMessageLength = maxMessageLength;
     this.dataLimiter = dataLimiter;
-    this.msgLimiter = msgLimiter;
+    this.messageLimiter = msgLimiter;
     recv();
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 83d0e9d..dfda14f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -723,12 +723,7 @@ public class ServerConnection implements Runnable {
     ThreadState threadState = null;
     try {
       if (msg != null) {
-        // this.logger.fine("donormalMsg() msgType " + msg.getMessageType());
-        // Since this thread is not interrupted when the cache server is
-        // shutdown,
-        // test again after a message has been read. This is a bit of a hack. I
-        // think this thread should be interrupted, but currently AcceptorImpl
-        // doesn't keep track of the threads that it launches.
+        // Since this thread is not interrupted when the cache server is shutdown, test again after a message has been read. This is a bit of a hack. I think this thread should be interrupted, but currently AcceptorImpl doesn't keep track of the threads that it launches.
         if (!this.processMessages || (crHelper.isShutdown())) {
           if (logger.isDebugEnabled()) {
             logger.debug("{} ignoring message of type {} from client {} due to shutdown.",
@@ -1078,8 +1073,6 @@ public class ServerConnection implements Runnable {
    */
   public Part updateAndGetSecurityPart() {
     // need to take care all message types here
-    // this.logger.fine("getSecurityPart() msgType = "
-    // + this.requestMsg.msgType);
     if (AcceptorImpl.isAuthenticationRequired()
         && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
         && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
@@ -1090,40 +1083,40 @@ public class ServerConnection implements Runnable {
       if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) {
         logger.debug(
             "ServerConnection.updateAndGetSecurityPart() not adding security part for msg type {}",
-            MessageType.getString(this.requestMsg.msgType));
+            MessageType.getString(this.requestMsg.messageType));
       }
     }
     return null;
   }
 
   private boolean isInternalMessage() {
-    return (this.requestMsg.msgType == MessageType.CLIENT_READY
-        || this.requestMsg.msgType == MessageType.CLOSE_CONNECTION
-        || this.requestMsg.msgType == MessageType.GETCQSTATS_MSG_TYPE
-        || this.requestMsg.msgType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
-        || this.requestMsg.msgType == MessageType.GET_CLIENT_PR_METADATA
-        || this.requestMsg.msgType == MessageType.INVALID
-        || this.requestMsg.msgType == MessageType.MAKE_PRIMARY
-        || this.requestMsg.msgType == MessageType.MONITORCQ_MSG_TYPE
-        || this.requestMsg.msgType == MessageType.PERIODIC_ACK
-        || this.requestMsg.msgType == MessageType.PING
-        || this.requestMsg.msgType == MessageType.REGISTER_DATASERIALIZERS
-        || this.requestMsg.msgType == MessageType.REGISTER_INSTANTIATORS
-        || this.requestMsg.msgType == MessageType.REQUEST_EVENT_VALUE
-        || this.requestMsg.msgType == MessageType.ADD_PDX_TYPE
-        || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_TYPE
-        || this.requestMsg.msgType == MessageType.GET_PDX_TYPE_BY_ID
-        || this.requestMsg.msgType == MessageType.SIZE
-        || this.requestMsg.msgType == MessageType.TX_FAILOVER
-        || this.requestMsg.msgType == MessageType.TX_SYNCHRONIZATION
-        || this.requestMsg.msgType == MessageType.GET_FUNCTION_ATTRIBUTES
-        || this.requestMsg.msgType == MessageType.ADD_PDX_ENUM
-        || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_ENUM
-        || this.requestMsg.msgType == MessageType.GET_PDX_ENUM_BY_ID
-        || this.requestMsg.msgType == MessageType.GET_PDX_TYPES
-        || this.requestMsg.msgType == MessageType.GET_PDX_ENUMS
-        || this.requestMsg.msgType == MessageType.COMMIT
-        || this.requestMsg.msgType == MessageType.ROLLBACK);
+    return (this.requestMsg.messageType == MessageType.CLIENT_READY
+        || this.requestMsg.messageType == MessageType.CLOSE_CONNECTION
+        || this.requestMsg.messageType == MessageType.GETCQSTATS_MSG_TYPE
+        || this.requestMsg.messageType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
+        || this.requestMsg.messageType == MessageType.GET_CLIENT_PR_METADATA
+        || this.requestMsg.messageType == MessageType.INVALID
+        || this.requestMsg.messageType == MessageType.MAKE_PRIMARY
+        || this.requestMsg.messageType == MessageType.MONITORCQ_MSG_TYPE
+        || this.requestMsg.messageType == MessageType.PERIODIC_ACK
+        || this.requestMsg.messageType == MessageType.PING
+        || this.requestMsg.messageType == MessageType.REGISTER_DATASERIALIZERS
+        || this.requestMsg.messageType == MessageType.REGISTER_INSTANTIATORS
+        || this.requestMsg.messageType == MessageType.REQUEST_EVENT_VALUE
+        || this.requestMsg.messageType == MessageType.ADD_PDX_TYPE
+        || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_TYPE
+        || this.requestMsg.messageType == MessageType.GET_PDX_TYPE_BY_ID
+        || this.requestMsg.messageType == MessageType.SIZE
+        || this.requestMsg.messageType == MessageType.TX_FAILOVER
+        || this.requestMsg.messageType == MessageType.TX_SYNCHRONIZATION
+        || this.requestMsg.messageType == MessageType.GET_FUNCTION_ATTRIBUTES
+        || this.requestMsg.messageType == MessageType.ADD_PDX_ENUM
+        || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_ENUM
+        || this.requestMsg.messageType == MessageType.GET_PDX_ENUM_BY_ID
+        || this.requestMsg.messageType == MessageType.GET_PDX_TYPES
+        || this.requestMsg.messageType == MessageType.GET_PDX_ENUMS
+        || this.requestMsg.messageType == MessageType.COMMIT
+        || this.requestMsg.messageType == MessageType.ROLLBACK);
   }
 
   public void run() {

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 4e450c7..1afe6ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2149,7 +2149,7 @@ public class Connection implements Runnable {
                 logger.fatal(LocalizedMessage
                     .create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex);
               }
-            } else /* (msgType == END_CHUNKED_MSG_TYPE) */ {
+            } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
               MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
               this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
               try {

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
index 031f827..80b16fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
@@ -30,8 +30,7 @@ import java.io.ObjectStreamClass;
 
 /**
  * Reusable Input/Output operation utility methods.
- * <p/>
- * 
+ *
  * @since GemFire 6.6
  */
 @SuppressWarnings("unused")
@@ -44,8 +43,7 @@ public abstract class IOUtils {
    * File.separator character. If the pathname is unspecified (null, empty or blank) then path
    * elements are considered relative to file system root, beginning with File.separator. If array
    * of path elements are null, then the pathname is returned as is.
-   * </p>
-   * 
+   *
    * @param pathname a String value indicating the base pathname.
    * @param pathElements the path elements to append to pathname.
    * @return the path elements appended to the pathname.

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
index 86fcbce..b2d903c 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -32,53 +32,49 @@ import org.apache.geode.test.junit.categories.UnitTest;
 public class MessageJUnitTest {
 
   private Message message;
-  private Socket mockSocket;
-  private MessageStats mockStats;
-  private ByteBuffer msgBuffer;
-  private ServerConnection mockServerConnection;
 
   @Before
   public void setUp() throws Exception {
-    mockSocket = mock(Socket.class);
-    message = new Message(2, Version.CURRENT);
-    assertEquals(2, message.getNumberOfParts());
-    mockStats = mock(MessageStats.class);
-    msgBuffer = ByteBuffer.allocate(1000);
-    mockServerConnection = mock(ServerConnection.class);
-    message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats);
+    Socket mockSocket = mock(Socket.class);
+    this.message = new Message(2, Version.CURRENT);
+    assertEquals(2, this.message.getNumberOfParts());
+    MessageStats mockStats = mock(MessageStats.class);
+    ByteBuffer msgBuffer = ByteBuffer.allocate(1000);
+    ServerConnection mockServerConnection = mock(ServerConnection.class);
+    this.message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats);
   }
 
   @Test
   public void clearDoesNotThrowNPE() throws Exception {
     // unsetComms clears the message's ByteBuffer, which was causing an NPE during shutdown
     // when clear() was invoked
-    message.unsetComms();
-    message.clear();
+    this.message.unsetComms();
+    this.message.clear();
   }
 
   @Test
   public void numberOfPartsIsAdjusted() {
-    int numParts = message.getNumberOfParts();
-    message.setNumberOfParts(2 * numParts + 1);
-    assertEquals(2 * numParts + 1, message.getNumberOfParts());
-    message.addBytesPart(new byte[1]);
-    message.addIntPart(2);
-    message.addLongPart(3);
-    message.addObjPart("4");
-    message.addStringPart("5");
-    assertEquals(5, message.getNextPartNumber());
+    int numParts = this.message.getNumberOfParts();
+    this.message.setNumberOfParts(2 * numParts + 1);
+    assertEquals(2 * numParts + 1, this.message.getNumberOfParts());
+    this.message.addBytesPart(new byte[1]);
+    this.message.addIntPart(2);
+    this.message.addLongPart(3);
+    this.message.addObjPart("4");
+    this.message.addStringPart("5");
+    assertEquals(5, this.message.getNextPartNumber());
   }
 
   @Test
   public void messageLongerThanMaxIntIsRejected() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
     when(mockPart1.getLength()).thenReturn(Integer.MAX_VALUE / 2);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
+    this.message.setParts(parts);
     try {
-      message.send();
+      this.message.send();
       fail("expected an exception but none was thrown");
     } catch (MessageTooLargeException e) {
       assertTrue(e.getMessage().contains("exceeds maximum integer value"));
@@ -87,14 +83,14 @@ public class MessageJUnitTest {
 
   @Test
   public void maxMessageSizeIsRespected() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
-    when(mockPart1.getLength()).thenReturn(Message.MAX_MESSAGE_SIZE / 2);
+    when(mockPart1.getLength()).thenReturn(Message.DEFAULT_MAX_MESSAGE_SIZE / 2);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
+    this.message.setParts(parts);
     try {
-      message.send();
+      this.message.send();
       fail("expected an exception but none was thrown");
     } catch (MessageTooLargeException e) {
       assertFalse(e.getMessage().contains("exceeds maximum integer value"));
@@ -103,21 +99,17 @@ public class MessageJUnitTest {
 
   /**
    * geode-1468: Message should clear the chunks in its Parts when performing cleanup.
-   * 
-   * @throws Exception
    */
   @Test
   public void streamBuffersAreClearedDuringCleanup() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
     when(mockPart1.getLength()).thenReturn(100);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
-    message.clearParts();
+    this.message.setParts(parts);
+    this.message.clearParts();
     verify(mockPart1, times(2)).clear();
   }
 
-  // TODO many more tests are needed
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
index 5a679bb..110d649 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
@@ -600,11 +600,11 @@ public abstract class JUnit4DistributedTestCase implements DistributedTestFixtur
     RegionTestCase.preSnapshotRegion = null;
     SocketCreator.resetHostNameCache();
     SocketCreator.resolve_dns = true;
-    Message.MAX_MESSAGE_SIZE = Message.DEFAULT_MAX_MESSAGE_SIZE;
 
     // clear system properties -- keep alphabetized
     System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "log-level");
     System.clearProperty("jgroups.resolve_dns");
+    System.clearProperty(Message.MAX_MESSAGE_SIZE_PROPERTY);
 
     if (InternalDistributedSystem.systemAttemptingReconnect != null) {
       InternalDistributedSystem.systemAttemptingReconnect.stopReconnecting();

http://git-wip-us.apache.org/repos/asf/geode/blob/971ab8b5/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index f403447..8cedbf0 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
 import static org.apache.geode.test.dunit.Assert.*;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -31,8 +33,8 @@ import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 
 /**
  * DUnit test for operations on ParallelGatewaySender
@@ -40,6 +42,9 @@ import org.apache.geode.test.junit.categories.FlakyTest;
 @Category(DistributedTest.class)
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+
   @Override
   protected final void postSetUpWANTestBase() throws Exception {
     IgnoredException.addIgnoredException("Broken pipe||Unexpected IOException");
@@ -582,13 +587,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
   @Test
   public void testParallelGatewaySenderMessageTooLargeException() {
+    vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024)));
+
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
     Integer nyPort = locatorPorts[1];
 
     // Create and start sender with reduced maximum message size and 1 dispatcher thread
     String regionName = getTestMethodName() + "_PR";
-    vm4.invoke(() -> setMaximumMessageSize(1024 * 1024));
     vm4.invoke(() -> createCache(lnPort));
     vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1));
     vm4.invoke(() -> createSender("ln", 2, true, 100, 100, false, false, null, false));
@@ -617,12 +623,6 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     ignoredGIOE.remove();
   }
 
-  private void setMaximumMessageSize(int maximumMessageSizeBytes) {
-    Message.MAX_MESSAGE_SIZE = maximumMessageSizeBytes;
-    LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: "
-        + System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"));
-  }
-
   private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
       boolean createAccessors, boolean startSenders) {
     // Note: This is a test-specific method used by several test to create


[4/6] geode git commit: Cleanup CacheClientUpdater

Posted by kl...@apache.org.
Cleanup CacheClientUpdater


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f13ceee6
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f13ceee6
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f13ceee6

Branch: refs/heads/feature/GEODE-2632-16
Commit: f13ceee61416cf16821f51b02ed2af8dc7cb109b
Parents: 971ab8b
Author: Kirk Lund <kl...@apache.org>
Authored: Mon May 22 14:49:21 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Mon May 22 14:49:21 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/geode/Instantiator.java     |  99 ++-
 .../internal/cache/tier/CachedRegionHelper.java |  18 +-
 .../cache/tier/sockets/CacheClientUpdater.java  | 850 ++++++++++---------
 .../cache/tier/sockets/ChunkedMessage.java      |   6 +-
 .../internal/cache/tier/sockets/Message.java    |  61 +-
 .../cache/tier/sockets/ServerConnection.java    |   5 +-
 ...arallelGatewaySenderOperationsDUnitTest.java |   3 +-
 7 files changed, 528 insertions(+), 514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/Instantiator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/Instantiator.java b/geode-core/src/main/java/org/apache/geode/Instantiator.java
index c727e5b..ea42057 100644
--- a/geode-core/src/main/java/org/apache/geode/Instantiator.java
+++ b/geode-core/src/main/java/org/apache/geode/Instantiator.java
@@ -20,16 +20,16 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 /**
- * {@code Instantiator} allows classes that implement {@link DataSerializable} to be registered
- * with the data serialization framework. Knowledge of {@code DataSerializable} classes allows
- * the framework to optimize how instances of those classes are data serialized.
+ * {@code Instantiator} allows classes that implement {@link DataSerializable} to be registered with
+ * the data serialization framework. Knowledge of {@code DataSerializable} classes allows the
+ * framework to optimize how instances of those classes are data serialized.
  *
  * <P>
  *
  * Ordinarily, when a {@code DataSerializable} object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, a special marker class id is
- * written to the stream followed by the class name of the {@code DataSerializable} object.
- * After the marker class id is read by {@link DataSerializer#readObject} it performs the following
+ * written to the stream followed by the class name of the {@code DataSerializable} object. After
+ * the marker class id is read by {@link DataSerializer#readObject} it performs the following
  * operations,
  *
  * <OL>
@@ -44,20 +44,20 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
  *
  * </OL>
  *
- * However, if a {@code DataSerializable} class is {@linkplain #register(Instantiator)
- * registered} with the data serialization framework and assigned a unique class id, an important
- * optimization can be performed that avoid the expense of using reflection to instantiate the
+ * However, if a {@code DataSerializable} class is {@linkplain #register(Instantiator) registered}
+ * with the data serialization framework and assigned a unique class id, an important optimization
+ * can be performed that avoid the expense of using reflection to instantiate the
  * {@code DataSerializable} class. When the object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, the object's registered class id
  * is written to the stream. Consequently, when the data is read from the stream, the
- * {@link #newInstance} method of the appropriate {@code Instantiator} instance is invoked to
- * create an "empty" instance of the {@code DataSerializable} instead of using reflection to
- * create the new instance.
+ * {@link #newInstance} method of the appropriate {@code Instantiator} instance is invoked to create
+ * an "empty" instance of the {@code DataSerializable} instead of using reflection to create the new
+ * instance.
  *
  * <P>
  *
- * Commonly, a {@code DataSerializable} class will register itself with the
- * {@code Instantiator} in a static initializer as shown in the below example code.
+ * Commonly, a {@code DataSerializable} class will register itself with the {@code Instantiator} in
+ * a static initializer as shown in the below example code.
  *
  * <PRE>
 public class User implements DataSerializable {
@@ -98,20 +98,19 @@ public class User implements DataSerializable {
 }
  * </PRE>
  *
- * {@code Instantiator}s may be distributed to other members of the distributed system when
- * they are registered. Consider the following scenario in which VM1 and VM2 are members of the same
+ * {@code Instantiator}s may be distributed to other members of the distributed system when they are
+ * registered. Consider the following scenario in which VM1 and VM2 are members of the same
  * distributed system. Both VMs define the sameRegion and VM2's region replicates the contents of
- * VM1's using replication. VM1 puts an instance of the above {@code User} class into the
- * region. The act of instantiating {@code User} will load the {@code User} class and
- * invoke its static initializer, thus registering the {@code Instantiator} with the data
- * serialization framework. Because the region is a replicate, the {@code User} will be data
- * serialized and sent to VM2. However, when VM2 attempts to data deserialize the {@code User},
- * its {@code Instantiator} will not necessarily be registered because {@code User}'s
- * static initializer may not have been invoked yet. As a result, an exception would be logged while
- * deserializing the {@code User} and the replicate would not appear to have the new value. So,
- * in order to ensure that the {@code Instantiator} is registered in VM2, the data
- * serialization framework distributes a message to each member when an {@code Instantiator} is
- * {@linkplain #register(Instantiator) registered}.
+ * VM1's using replication. VM1 puts an instance of the above {@code User} class into the region.
+ * The act of instantiating {@code User} will load the {@code User} class and invoke its static
+ * initializer, thus registering the {@code Instantiator} with the data serialization framework.
+ * Because the region is a replicate, the {@code User} will be data serialized and sent to VM2.
+ * However, when VM2 attempts to data deserialize the {@code User}, its {@code Instantiator} will
+ * not necessarily be registered because {@code User}'s static initializer may not have been invoked
+ * yet. As a result, an exception would be logged while deserializing the {@code User} and the
+ * replicate would not appear to have the new value. So, in order to ensure that the
+ * {@code Instantiator} is registered in VM2, the data serialization framework distributes a message
+ * to each member when an {@code Instantiator} is {@linkplain #register(Instantiator) registered}.
  * <p>
  * Note that the framework does not require that an {@code Instantiator} be
  * {@link java.io.Serializable}, but it does require that it provide a
@@ -140,15 +139,15 @@ public abstract class Instantiator {
   private ClientProxyMembershipID context;
 
   /**
-   * Registers a {@code DataSerializable} class with the data serialization framework. This
-   * method is usually invoked from the static initializer of a class that implements
+   * Registers a {@code DataSerializable} class with the data serialization framework. This method
+   * is usually invoked from the static initializer of a class that implements
    * {@code DataSerializable}.
    *
-   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked
-   *        when an object is data deserialized.
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked when
+   *        an object is data deserialized.
    *
-   * @throws IllegalStateException If class {@code c} is already registered with a different
-   *         class id, or another class has already been registered with id {@code classId}
+   * @throws IllegalStateException If class {@code c} is already registered with a different class
+   *         id, or another class has already been registered with id {@code classId}
    * @throws NullPointerException If {@code instantiator} is {@code null}.
    */
   public static synchronized void register(Instantiator instantiator) {
@@ -156,16 +155,16 @@ public abstract class Instantiator {
   }
 
   /**
-   * Registers a {@code DataSerializable} class with the data serialization framework. This
-   * method is usually invoked from the static initializer of a class that implements
+   * Registers a {@code DataSerializable} class with the data serialization framework. This method
+   * is usually invoked from the static initializer of a class that implements
    * {@code DataSerializable}.
    *
-   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked
-   *        when an object is data deserialized.
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked when
+   *        an object is data deserialized.
    *
-   * @param distribute True if the registered {@code Instantiator} has to be distributed to
-   *        other members of the distributed system. Note that if distribute is set to false it may
-   *        still be distributed in some cases.
+   * @param distribute True if the registered {@code Instantiator} has to be distributed to other
+   *        members of the distributed system. Note that if distribute is set to false it may still
+   *        be distributed in some cases.
    *
    * @throws IllegalArgumentException If class {@code c} is already registered with a different
    *         class id, or another class has already been registered with id {@code classId}
@@ -182,11 +181,11 @@ public abstract class Instantiator {
    *
    * @param c The {@code DataSerializable} class to register. This class must have a static
    *        initializer that registers this {@code Instantiator}.
-   * @param classId A unique id for class {@code c}. The {@code classId} must not be zero.
-   *        This has been an {@code int} since dsPhase1.
+   * @param classId A unique id for class {@code c}. The {@code classId} must not be zero. This has
+   *        been an {@code int} since dsPhase1.
    *
-   * @throws IllegalArgumentException If {@code c} does not implement
-   *         {@code DataSerializable}, {@code classId} is less than or equal to zero.
+   * @throws IllegalArgumentException If {@code c} does not implement {@code DataSerializable},
+   *         {@code classId} is less than or equal to zero.
    * @throws NullPointerException If {@code c} is {@code null}
    */
   public Instantiator(Class<? extends DataSerializable> c, int classId) {
@@ -202,8 +201,8 @@ public abstract class Instantiator {
     }
 
     if (classId == 0) {
-      throw new IllegalArgumentException(LocalizedStrings.Instantiator_CLASS_ID_0_MUST_NOT_BE_0
-          .toLocalizedString(classId));
+      throw new IllegalArgumentException(
+          LocalizedStrings.Instantiator_CLASS_ID_0_MUST_NOT_BE_0.toLocalizedString(classId));
     }
 
     this.clazz = c;
@@ -211,16 +210,15 @@ public abstract class Instantiator {
   }
 
   /**
-   * Creates a new "empty" instance of a {@code DataSerializable} class whose state will be
-   * filled in by invoking its {@link DataSerializable#fromData fromData} method.
+   * Creates a new "empty" instance of a {@code DataSerializable} class whose state will be filled
+   * in by invoking its {@link DataSerializable#fromData fromData} method.
    *
    * @see DataSerializer#readObject
    */
   public abstract DataSerializable newInstance();
 
   /**
-   * Returns the {@code DataSerializable} class that is instantiated by this
-   * {@code Instantiator}.
+   * Returns the {@code DataSerializable} class that is instantiated by this {@code Instantiator}.
    */
   public Class<? extends DataSerializable> getInstantiatedClass() {
     return this.clazz;
@@ -241,8 +239,7 @@ public abstract class Instantiator {
   }
 
   /**
-   * Returns the unique {@code eventId} of this {@code Instantiator}. For internal use
-   * only.
+   * Returns the unique {@code eventId} of this {@code Instantiator}. For internal use only.
    */
   public Object/* EventID */ getEventId() {
     return this.eventId;

http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
index a82a804..940da95 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
@@ -27,21 +27,19 @@ import org.apache.geode.internal.cache.InternalCache;
 public class CachedRegionHelper {
 
   private final InternalCache cache;
+
   private volatile boolean shutdown = false;
-  // private Map regions;
-  private volatile int slowEmulationSleep = 0;
 
-  public CachedRegionHelper(InternalCache c) {
-    this.cache = c;
-    // this.regions = new WeakHashMap();
+  public CachedRegionHelper(InternalCache cache) {
+    this.cache = cache;
   }
 
   public void checkCancelInProgress(Throwable e) throws CancelException {
-    cache.getCancelCriterion().checkCancelInProgress(e);
+    this.cache.getCancelCriterion().checkCancelInProgress(e);
   }
 
   public Region getRegion(String name) {
-    return cache.getRegion(name);
+    return this.cache.getRegion(name);
   }
 
   public InternalCache getCache() {
@@ -53,12 +51,14 @@ public class CachedRegionHelper {
   }
 
   public boolean isShutdown() {
-    return shutdown || cache.getCancelCriterion().isCancelInProgress();
+    return this.shutdown || this.cache.getCancelCriterion().isCancelInProgress();
   }
 
+  /**
+   * CachedRegionHelper#close() does nothing
+   */
   public void close() {
     // cache = null;
-    // regions = null;
   }
 
   /**


[2/6] geode git commit: Cleanup CacheClientUpdater

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 8cedbf0..702e6c8 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -43,7 +43,8 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
   @Rule
-  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
 
   @Override
   protected final void postSetUpWANTestBase() throws Exception {


[3/6] geode git commit: Cleanup CacheClientUpdater

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index 7698550..8915c55 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -94,9 +94,9 @@ import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.security.GemFireSecurityException;
 
 /**
- * <code>CacheClientUpdater</code> is a thread that processes update messages from a cache server
- * and {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local
- * cache based on the contents of those messages.
+ * {@code CacheClientUpdater} is a thread that processes update messages from a cache server and
+ * {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local cache
+ * based on the contents of those messages.
  * 
  * @since GemFire 3.5
  */
@@ -104,6 +104,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+
   /**
    * true if the constructor successfully created a connection. If false, the run method for this
    * thread immediately exits.
@@ -129,6 +131,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * The input stream of the socket
    */
   private final InputStream in;
+
   /**
    * Failed updater from the endpoint previously known as the primary
    */
@@ -139,12 +142,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private final ByteBuffer commBuffer;
 
-  private boolean commBufferReleased;
+  private boolean commBufferReleased; // TODO: fix synchronization
 
   private final CCUStats stats;
 
   /**
-   * Cache for which we provide service
+   * Cache for which we provide service TODO: lifecycle and synchronization need work
    */
   private /* final */ InternalCache cache;
 
@@ -175,18 +178,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private boolean isOpCompleted;
 
-  public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
+  public static final String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
 
   /**
-   * to enable test flag
+   * to enable test flag TODO: eliminate isUsedByTest
    */
   public static boolean isUsedByTest;
 
   /**
    * Indicates if full value was requested from server as a result of failure in applying delta
-   * bytes.
+   * bytes. TODO: only used for test assertion
    */
-  public static boolean fullValueRequested = false;
+  static boolean fullValueRequested = false;
 
   private final ServerLocation location;
 
@@ -195,8 +198,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   private EndpointManager eManager = null;
   private Endpoint endpoint = null;
 
-  static private final long MAX_CACHE_WAIT = Long
-      .getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120).longValue(); // seconds
+  private static final long MAX_CACHE_WAIT =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120); // seconds
 
   /**
    * Return true if cache appears
@@ -231,7 +234,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       boolean interrupted = Thread.interrupted();
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         interrupted = true;
       } finally {
         if (interrupted) {
@@ -245,12 +248,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   }
 
   /**
-   * Creates a new <code>CacheClientUpdater</code> with a given name that waits for a server to
-   * connect on a given port.
+   * Creates a new {@code CacheClientUpdater} with a given name that waits for a server to connect
+   * on a given port.
    *
    * @param name descriptive name, used for our ThreadGroup
    * @param location the endpoint we represent
-   * @param primary true if our endpoint is primary TODO ask the ep for this?
+   * @param primary true if our endpoint is primary
    * @param ids the system we are distributing messages through
    * 
    * @throws AuthenticationRequiredException when client is not configured to send credentials using
@@ -265,6 +268,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       Endpoint endpoint, int handshakeTimeout, SocketCreator socketCreator)
       throws AuthenticationRequiredException, AuthenticationFailedException,
       ServerRefusedConnectionException {
+
     super(LoggingThreadGroup.createThreadGroup("Client update thread"), name);
     this.setDaemon(true);
     this.system = (InternalDistributedSystem) ids;
@@ -276,6 +280,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     this.eManager = eManager;
     this.endpoint = endpoint;
     this.stats = new CCUStats(this.system, this.location);
+
     // Create the connection...
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
@@ -291,7 +296,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     try {
       // Size of the server-to-client communication socket buffers
       int socketBufferSize =
-          Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
+          Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", DEFAULT_SOCKET_BUFFER_SIZE);
 
       mySock = socketCreator.connectForClient(location.getHostName(), location.getPort(),
           handshakeTimeout, socketBufferSize);
@@ -327,31 +332,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         }
       }
 
-      {
-        int bufSize = 1024;
-        try {
-          bufSize = mySock.getSendBufferSize();
-          if (bufSize < 1024) {
-            bufSize = 1024;
-          }
-        } catch (SocketException ignore) {
+      int bufSize = 1024;
+      try {
+        bufSize = mySock.getSendBufferSize();
+        if (bufSize < 1024) {
+          bufSize = 1024;
         }
-        cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
-      }
-      {
-        // create a "server" memberId we currently don't know much about the
-        // server.
-        // Would be nice for it to send us its member id
-        // TODO: change the serverId to use the endpoint's getMemberId() which
-        // returns a
-        // DistributedMember (once gfecq branch is merged to trunk).
-        MemberAttributes ma =
-            new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
-        sid = new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true,
-            ma);
+      } catch (SocketException ignore) {
       }
+      cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
+
+      // create a "server" memberId we currently don't know much about the server.
+      // Would be nice for it to send us its member id
+      // TODO: change the serverId to use the endpoint's getMemberId() which returns a
+      // DistributedMember (once gfecq branch is merged to trunk).
+      MemberAttributes ma =
+          new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
+      sid =
+          new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, ma);
+
       success = true;
-    } catch (ConnectException e) {
+    } catch (ConnectException ignore) {
       if (!quitting()) {
         logger.warn(LocalizedMessage
             .create(LocalizedStrings.CacheClientUpdater_0_CONNECTION_WAS_REFUSED, this));
@@ -385,20 +386,22 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             e.getMessage()));
       }
     } finally {
-      connected = success;
+      this.connected = success;
       if (mySock != null) {
         try {
           mySock.setSoTimeout(0);
-        } catch (SocketException e) {
+        } catch (SocketException ignore) {
           // ignore: nothing we can do about this
         }
       }
-      if (connected) {
-        socket = mySock;
-        out = tmpOut;
-        in = tmpIn;
-        serverId = sid;
-        commBuffer = cb;
+
+      if (this.connected) {
+        this.socket = mySock;
+        this.out = tmpOut;
+        this.in = tmpIn;
+        this.serverId = sid;
+        this.commBuffer = cb;
+
         // Don't want the timeout after handshake
         if (mySock != null) {
           try {
@@ -406,12 +409,13 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           } catch (SocketException ignore) {
           }
         }
+
       } else {
-        socket = null;
-        serverId = null;
-        commBuffer = null;
-        out = null;
-        in = null;
+        this.socket = null;
+        this.serverId = null;
+        this.commBuffer = null;
+        this.out = null;
+        this.in = null;
 
         if (mySock != null) {
           try {
@@ -439,29 +443,31 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   }
 
   public boolean isConnected() {
-    return connected;
+    return this.connected;
   }
 
+  @Override
   public boolean isPrimary() {
-    return isPrimary;
+    return this.isPrimary;
   }
 
   public InternalLogWriter getSecurityLogger() {
     return this.qManager.getSecurityLogger();
   }
 
+  @Override
   public void setFailedUpdater(ClientUpdater failedUpdater) {
     this.failedUpdater = failedUpdater;
   }
 
   /**
-   * Performs the work of the client update thread. Creates a <code>ServerSocket</code> and waits
-   * for the server to connect to it.
+   * Performs the work of the client update thread. Creates a {@code ServerSocket} and waits for the
+   * server to connect to it.
    */
   @Override
   public void run() {
+    EntryLogger.setSource(this.serverId, "RI");
     boolean addedListener = false;
-    EntryLogger.setSource(serverId, "RI");
     try {
       this.system.addDisconnectListener(this);
       addedListener = true;
@@ -472,8 +478,10 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         return;
       }
       processMessages();
-    } catch (CancelException e) {
-      return; // just bail
+
+    } catch (CancelException ignore) {
+      // just bail
+
     } finally {
       if (addedListener) {
         this.system.removeDisconnectListener(this);
@@ -486,8 +494,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Notifies this thread to stop processing
    */
-  protected void stopProcessing() {
-    continueProcessing.set(false);// = false;
+  private void stopProcessing() {
+    this.continueProcessing.set(false);
   }
 
   /**
@@ -495,39 +503,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * duplicates. Note: this method is not named stop because this is a Thread which has a deprecated
    * stop method.
    */
-  public void stopUpdater() {
+  private void stopUpdater() {
     boolean isSelfDestroying = Thread.currentThread() == this;
-
     stopProcessing();
+
     // need to also close the socket for this interrupt to wakeup
     // the thread. This fixes bug 35691.
-    // this.close(); // this should not be done here.
 
     if (this.isAlive()) {
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Stopping {}", this.location, this);
       }
+
       if (!isSelfDestroying) {
         interrupt();
         try {
-          if (socket != null) {
-            socket.close();
+          if (this.socket != null) {
+            this.socket.close();
           }
-        } catch (VirtualMachineError err) {
-          SystemFailure.initiateFailure(err);
-          // If this ever returns, rethrow the error. We're poisoned
-          // now, so don't let this thread continue.
-          throw err;
-        } catch (Throwable t) {
-          // Whenever you catch Error or Throwable, you must also
-          // catch VirtualMachineError (see above). However, there is
-          // _still_ a possibility that you are dealing with a cascading
-          // error condition, so you also need to check to see if the JVM
-          // is still usable:
-          SystemFailure.checkFailure();
-          // dont care...
+        } catch (IOException e) {
           if (logger.isDebugEnabled()) {
-            logger.debug(t.getMessage(), t);
+            logger.debug(e.getMessage(), e);
           }
         }
       } // !isSelfDestroying
@@ -537,32 +533,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Signals the run thread to stop, closes underlying resources.
    */
+  @Override
   public void close() {
-    this.continueProcessing.set(false);// = false; // signals we are done.
+    this.continueProcessing.set(false); // signals we are done.
 
-    // Close the socket
-    // This will also cause the underlying streams to fail.
+    // Close the socket. This will also cause the underlying streams to fail.
     try {
-      if (socket != null) {
-        socket.close();
+      if (this.socket != null) {
+        this.socket.close();
       }
-    } catch (Exception e) {
+    } catch (IOException ignore) {
       // ignore
     }
 
-    try {
-      this.stats.close();
-    } catch (Exception e) {
-      // ignore
-    }
+    this.stats.close();
 
     // close the helper
-    try {
-      if (cacheHelper != null) {
-        cacheHelper.close();
-      }
-    } catch (Exception e) {
-      // ignore
+    if (this.cacheHelper != null) {
+      this.cacheHelper.close();
     }
     releaseCommBuffer();
   }
@@ -580,22 +568,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /* refinement of method inherited from Thread */
   @Override
   public String toString() {
-    return this.getName() + " (" + this.location.getHostName() + ":" + this.location.getPort()
-        + ")";
+    return getName() + " (" + this.location.getHostName() + ':' + this.location.getPort() + ')';
   }
 
   /**
    * Handle a marker message
    * 
-   * @param m message containing the data
+   * @param clientMessage message containing the data
    */
-  private void handleMarker(Message m) {
+  private void handleMarker(Message clientMessage) {
     try {
       final boolean isDebugEnabled = logger.isDebugEnabled();
       if (isDebugEnabled) {
-        logger.debug("Received marker message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received marker message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
+
       this.qManager.getState().processMarker();
+
       if (isDebugEnabled) {
         logger.debug("Processed marker message");
       }
@@ -610,41 +600,40 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Create or update an entry
    * 
-   * @param m message containing the data
+   * @param clientMessage message containing the data
    */
-  private void handleUpdate(Message m) {
+  private void handleUpdate(Message clientMessage) {
     String regionName = null;
     Object key = null;
     Part valuePart = null;
-    Object newValue = null;
-    byte[] deltaBytes = null;
-    Object fullValue = null;
-    boolean isValueObject = false;
-    int partCnt = 0;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
+
       // Retrieve the data from the put message parts
       if (isDebugEnabled) {
-        logger.debug("Received put message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received put message of length ({} bytes)", clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      boolean isDeltaSent = ((Boolean) m.getPart(partCnt++).getObject()).booleanValue();
-      valuePart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      boolean isDeltaSent = (Boolean) clientMessage.getPart(partCnt++).getObject();
+      valuePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
-      EventID eventId = (EventID) m.getPart(m.getNumberOfParts() - 1).getObject();
+      EventID eventId =
+          (EventID) clientMessage.getPart(clientMessage.getNumberOfParts() - 1).getObject();
 
-      boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
-      boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+      boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+      boolean withCQs = (Boolean) hasCqsPart.getObject();
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
@@ -655,30 +644,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       // object, it will be stored as a CachedDeserializable and
       // deserialized only when requested.
 
-      boolean isCreate = (m.getMessageType() == MessageType.LOCAL_CREATE);
+      boolean isCreate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE;
+
       if (isDebugEnabled) {
-        logger
-            .debug(
-                "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
-                regionName, key, isCreate,
-                (valuePart.isObject() ? new StringBuilder(" value: ")
-                    .append(deserialize(valuePart.getSerializedForm())) : ""),
-                callbackArgument, withInterest, withCQs, eventId, versionTag);
+        logger.debug(
+            "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
+            regionName, key, isCreate,
+            valuePart.isObject()
+                ? new StringBuilder(" value: ").append(deserialize(valuePart.getSerializedForm()))
+                : "",
+            callbackArgument, withInterest, withCQs, eventId, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
+      Object newValue = null;
+      byte[] deltaBytes = null;
+      Object fullValue = null;
+      boolean isValueObject;
 
       if (!isDeltaSent) {
         // bug #42162 - must check for a serialized null here
         byte[] serializedForm = valuePart.getSerializedForm();
+
         if (isCreate && InternalDataSerializer.isSerializedNull(serializedForm)) {
           // newValue = null; newValue is already null
         } else {
           newValue = valuePart.getSerializedForm();
         }
+
         if (withCQs) {
           fullValue = valuePart.getObject();
         }
+
         isValueObject = valuePart.isObject();
       } else {
         deltaBytes = valuePart.getSerializedForm();
@@ -689,40 +687,49 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         if (isDebugEnabled && !quitting()) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
         }
+
       } else if (region.hasServerProxy() && ServerResponseMatrix
-          .checkForValidStateAfterNotification(region, key, m.getMessageType())
+          .checkForValidStateAfterNotification(region, key, clientMessage.getMessageType())
           && (withInterest || !withCQs)) {
         @Released
         EntryEventImpl newEvent = null;
+
         try {
           // Create an event and put the entry
           newEvent = EntryEventImpl.create(region,
-              ((m.getMessageType() == MessageType.LOCAL_CREATE) ? Operation.CREATE
-                  : Operation.UPDATE),
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE ? Operation.CREATE
+                  : Operation.UPDATE,
               key, null /* newValue */, callbackArgument /* callbackArg */, true /* originRemote */,
               eventId.getDistributedMember());
+
           newEvent.setVersionTag(versionTag);
           newEvent.setFromServer(true);
+
           region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, deltaBytes,
-              isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
-              qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+              isValueObject, callbackArgument,
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+              this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+              eventId);
+
           this.isOpCompleted = true;
+
           // bug 45520 - ConcurrentCacheModificationException is not thrown and we must check this
           // flag
-          // if (newEvent.isConcurrencyConflict()) {
-          // return; // this is logged elsewhere at fine level
-          // }
           if (withCQs && isDeltaSent) {
             fullValue = newEvent.getNewValue();
           }
-        } catch (InvalidDeltaException ide) {
+        } catch (InvalidDeltaException ignore) {
           Part fullValuePart = requestFullValue(eventId, "Caught InvalidDeltaException.");
           region.getCachePerfStats().incDeltaFullValuesRequested();
-          fullValue = newValue = fullValuePart.getObject();
-          isValueObject = Boolean.valueOf(fullValuePart.isObject());
+          fullValue = newValue = fullValuePart.getObject(); // TODO: fix this line
+          isValueObject = fullValuePart.isObject();
+
           region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, null,
-              isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
-              qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+              isValueObject, callbackArgument,
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+              this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+              eventId);
+
           this.isOpCompleted = true;
         } finally {
           if (newEvent != null)
@@ -737,20 +744,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
       // Update CQs. CQs can exist without client region.
       if (withCQs) {
-        Part numCqsPart = m.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, fullValue,
-            deltaBytes, eventId);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), key, fullValue, deltaBytes, eventId);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_PUT_ENTRY_REGION_0_KEY_1_VALUE_2
-              .toLocalizedString(
-                  new Object[] {regionName, key, deserialize(valuePart.getSerializedForm())});
+              .toLocalizedString(regionName, key, deserialize(valuePart.getSerializedForm()));
       handleException(message, e);
     }
   }
@@ -763,12 +769,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     if (isDebugEnabled) {
       logger.debug("{} Requesting full value...", reason);
     }
-    Part result = (Part) GetEventValueOp.executeOnPrimary(qManager.getPool(), eventId, null);
+    Part result = (Part) GetEventValueOp.executeOnPrimary(this.qManager.getPool(), eventId, null);
 
     if (result == null) {
       // Just log a warning. Do not stop CCU thread.
+      // TODO: throw a subclass of Exception
       throw new Exception("Could not retrieve full value for " + eventId);
     }
+
     if (isDebugEnabled) {
       logger.debug("Full value received.");
     }
@@ -778,39 +786,41 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Invalidate an entry
    * 
-   * @param m message describing the entry
+   * @param clientMessage message describing the entry
    */
-  private void handleInvalidate(Message m) {
+  private void handleInvalidate(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
+
       // Retrieve the data from the local-invalidate message parts
       if (isDebugEnabled) {
-        logger.debug("Received invalidate message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received invalidate message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
 
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
 
       Object callbackArgument = callbackArgumentPart.getObject();
-      boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
-      boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+      boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+      boolean withCQs = (Boolean) hasCqsPart.getObject();
 
       if (isDebugEnabled) {
         logger.debug(
@@ -818,34 +828,36 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             regionName, key, callbackArgument, withInterest, withCQs, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
+
       } else {
         if (region.hasServerProxy() && (withInterest || !withCQs)) {
           try {
-            Part eid = m.getPart(m.getNumberOfParts() - 1);
+            Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
             EventID eventId = (EventID) eid.getObject();
+
             try {
               region.basicBridgeClientInvalidate(eventId.getDistributedMember(), key,
                   callbackArgument,
-                  qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+                  this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
                   versionTag);
-            } catch (ConcurrentCacheModificationException e) {
-              // return; allow CQs to be processed
+            } catch (ConcurrentCacheModificationException ignore) {
+              // allow CQs to be processed
             }
+
             this.isOpCompleted = true;
             // fix for 36615
-            qManager.getState().incrementInvalidatedStats();
+            this.qManager.getState().incrementInvalidatedStats();
 
             if (isDebugEnabled) {
               logger.debug("Invalidated entry for region: {} key: {} callbackArgument: {}",
                   regionName, key, callbackArgument);
             }
-          } catch (EntryNotFoundException e) {
-            /* ignore */
+          } catch (EntryNotFoundException ignore) {
             if (isDebugEnabled && !quitting()) {
               logger.debug("Already invalidated entry for region: {} key: {} callbackArgument: {}",
                   regionName, key, callbackArgument);
@@ -858,19 +870,20 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       if (withCQs) {
         // The client may have been registered to receive invalidates for
         // create and updates operations. Get the actual region operation.
-        Part regionOpType = m.getPart(partCnt++);
-        Part numCqsPart = m.getPart(partCnt++);
+        Part regionOpType = clientMessage.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), regionOpType.getInt(), key, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), regionOpType.getInt(),
+            key, null);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       final String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_INVALIDATE_ENTRY_REGION_0_KEY_1
-              .toLocalizedString(new Object[] {regionName, key});
+              .toLocalizedString(regionName, key);
       handleException(message, e);
     }
   }
@@ -878,26 +891,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * locally destroy an entry
    * 
-   * @param m message describing the entry
+   * @param clientMessage message describing the entry
    */
-  private void handleDestroy(Message m) {
+  private void handleDestroy(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
       // Retrieve the data from the local-destroy message parts
       if (isDebugEnabled) {
-        logger.debug("Received destroy message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received destroy message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
@@ -905,8 +919,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
 
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
       boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
@@ -918,30 +932,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             regionName, key, callbackArgument, withInterest, withCQs, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
-      EventID eventId = null;
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
+
       } else if (region.hasServerProxy() && (withInterest || !withCQs)) {
+        EventID eventId = null;
         try {
-          Part eid = m.getPart(m.getNumberOfParts() - 1);
+          Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
           eventId = (EventID) eid.getObject();
+
           try {
             region.basicBridgeClientDestroy(eventId.getDistributedMember(), key, callbackArgument,
-                qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+                this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
                 versionTag);
-          } catch (ConcurrentCacheModificationException e) {
-            // return; allow CQs to be processed
+          } catch (ConcurrentCacheModificationException ignore) {
+            // allow CQs to be processed
           }
+
           this.isOpCompleted = true;
           if (isDebugEnabled) {
             logger.debug("Destroyed entry for region: {} key: {} callbackArgument: {}", regionName,
                 key, callbackArgument);
           }
-        } catch (EntryNotFoundException e) {
-          /* ignore */
+        } catch (EntryNotFoundException ignore) {
           if (isDebugEnabled && !quitting()) {
             logger.debug(
                 "Already destroyed entry for region: {} key: {} callbackArgument: {} eventId={}",
@@ -952,18 +968,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       }
 
       if (withCQs) {
-        Part numCqsPart = m.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), key, null);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_DESTROY_ENTRY_REGION_0_KEY_1
-              .toLocalizedString(new Object[] {regionName, key});
+              .toLocalizedString(regionName, key);
       handleException(message, e);
     }
   }
@@ -971,44 +988,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Locally destroy a region
    * 
-   * @param m message describing the region
+   * @param clientMessage message describing the region
    */
-  private void handleDestroyRegion(Message m) {
-    Part regionNamePart = null, callbackArgumentPart = null;
+  private void handleDestroyRegion(Message clientMessage) {
     String regionName = null;
-    Object callbackArgument = null;
-    LocalRegion region = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the local-destroy-region message parts
       if (isDebugEnabled) {
-        logger.debug("Received destroy region message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received destroy region message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
-      regionNamePart = m.getPart(partCnt++);
-      callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
       regionName = regionNamePart.getString();
-      callbackArgument = callbackArgumentPart.getObject();
+      Object callbackArgument = callbackArgumentPart.getObject();
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       if (isDebugEnabled) {
         logger.debug("Destroying region: {} callbackArgument: {}", regionName, callbackArgument);
       }
 
       // Handle CQs if any on this region.
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        // TODO: partCnt is unused -- does processCqs have side effects
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
@@ -1025,7 +1042,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           logger.debug("Destroyed region: {} callbackArgument: {}", regionName, callbackArgument);
         }
       }
-    } catch (RegionDestroyedException e) { // already destroyed
+    } catch (RegionDestroyedException ignore) { // already destroyed
       if (isDebugEnabled) {
         logger.debug("region already destroyed: {}", regionName);
       }
@@ -1040,24 +1057,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Locally clear a region
    * 
-   * @param m message describing the region to clear
+   * @param clientMessage message describing the region to clear
    */
-  private void handleClearRegion(Message m) {
+  private void handleClearRegion(Message clientMessage) {
     String regionName = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the clear-region message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received clear region message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       Object callbackArgument = callbackArgumentPart.getObject();
@@ -1065,17 +1082,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         logger.debug("Clearing region: {} callbackArgument: {}", regionName, callbackArgument);
       }
 
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
@@ -1088,7 +1106,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       if (region.hasServerProxy()) {
         // Locally clear the region
         region.basicBridgeClientClear(callbackArgument,
-            qManager.getState().getProcessedMarker() || !this.isDurableClient);
+            this.qManager.getState().getProcessedMarker() || !this.isDurableClient);
 
         if (isDebugEnabled) {
           logger.debug("Cleared region: {} callbackArgument: {}", regionName, callbackArgument);
@@ -1106,50 +1124,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * Locally invalidate a region NOTE: Added as part of bug#38048. The code only takes care of CQ
    * processing. Support needs to be added for local region invalidate.
    * 
-   * @param m message describing the region to clear
+   * @param clientMessage message describing the region to clear
    */
-  private void handleInvalidateRegion(Message m) {
+  private void handleInvalidateRegion(Message clientMessage) {
     String regionName = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the invalidate-region message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received invalidate region message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
       partCnt++; // Part callbackArgumentPart = m.getPart(partCnt++);
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
-      // Object callbackArgument = callbackArgumentPart.getObject();
 
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        // TODO: partCnt is unused
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
-        return;
-      }
-
-      // Verify that the region in question should respond to this
-      // message
-      if (region.hasServerProxy()) {
-        return;
       }
 
     } catch (Exception e) {
@@ -1163,40 +1175,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Register instantiators locally
    *
-   * @param msg message describing the new instantiators
+   * @param clientMessage message describing the new instantiators
    * @param eventId eventId of the instantiators
    */
-  private void handleRegisterInstantiator(Message msg, EventID eventId) {
+  private void handleRegisterInstantiator(Message clientMessage, EventID eventId) {
     String instantiatorClassName = null;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
-      int noOfParts = msg.getNumberOfParts();
+      int noOfParts = clientMessage.getNumberOfParts();
       if (isDebugEnabled) {
         logger.debug("{}: Received register instantiators message of parts {}", getName(),
             noOfParts);
       }
+
       Assert.assertTrue((noOfParts - 1) % 3 == 0);
-      for (int i = 0; i < noOfParts - 1; i = i + 3) {
+      for (int i = 0; i < noOfParts - 1; i += 3) {
         instantiatorClassName =
-            (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
-        String instantiatedClassName =
-            (String) CacheServerHelper.deserialize(msg.getPart(i + 1).getSerializedForm());
-        int id = msg.getPart(i + 2).getInt();
+            (String) CacheServerHelper.deserialize(clientMessage.getPart(i).getSerializedForm());
+        String instantiatedClassName = (String) CacheServerHelper
+            .deserialize(clientMessage.getPart(i + 1).getSerializedForm());
+        int id = clientMessage.getPart(i + 2).getInt();
         InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false,
-            eventId, null/* context */);
-        // distribute is false because we don't want to propagate this to
-        // servers recursively
+            eventId, null);
+        // distribute is false because we don't want to propagate this to servers recursively
       }
 
       // CALLBACK TESTING PURPOSE ONLY
       if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
-        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-        bo.afterReceivingFromServer(eventId);
+        ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+        clientServerObserver.afterReceivingFromServer(eventId);
       }
 
-    }
-    // TODO bug: can the following catch be more specific?
-    catch (Exception e) {
+    } catch (Exception e) {
       if (isDebugEnabled) {
         logger.debug("{}: Caught following exception while attempting to read Instantiator : {}",
             this, instantiatorClassName, e);
@@ -1207,6 +1218,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   private void handleRegisterDataSerializer(Message msg, EventID eventId) {
     Class dataSerializerClass = null;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       int noOfParts = msg.getNumberOfParts();
       if (isDebugEnabled) {
@@ -1220,8 +1232,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
               (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
           int id = msg.getPart(i + 1).getInt();
           InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id);
-          // distribute is false because we don't want to propagate this to
-          // servers recursively
+          // distribute is false because we don't want to propagate this to servers recursively
 
           int numOfClasses = msg.getPart(i + 2).getInt();
           int j = 0;
@@ -1230,7 +1241,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
                 (String) CacheServerHelper.deserialize(msg.getPart(i + 3 + j).getSerializedForm());
             InternalDataSerializer.updateSupportedClassesMap(dataSerializerClassName, className);
           }
-          i = i + 3 + j;
+
+          i += 3 + j;
         } catch (ClassNotFoundException e) {
           if (isDebugEnabled) {
             logger.debug(
@@ -1246,9 +1258,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         bo.afterReceivingFromServer(eventId);
       }
 
-    }
-    // TODO bug: can the following catch be more specific?
-    catch (Exception e) {
+    } catch (Exception e) {
       if (isDebugEnabled) {
         logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}",
             this, dataSerializerClass, e);
@@ -1259,93 +1269,87 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Processes message to invoke CQ listeners.
    */
-  private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
-      Object key, Object value) {
-    return processCqs(m, startMessagePart, numCqParts, messageType, key, value, null,
-        null/* eventId */);
+  private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+      int messageType, Object key, Object value) {
+    return processCqs(clientMessage, startMessagePart, numCqParts, messageType, key, value, null,
+        null);
   }
 
-  private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
-      Object key, Object value, byte[] delta, EventID eventId) {
+  private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+      int messageType, Object key, Object value, byte[] delta, EventID eventId) {
     HashMap cqs = new HashMap();
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
     for (int cqCnt = 0; cqCnt < numCqParts;) {
-      StringBuilder str = null;
+      StringBuilder sb = null;
       if (isDebugEnabled) {
-        str = new StringBuilder(100);
-        str.append("found these queries: ");
+        sb = new StringBuilder(100);
+        sb.append("found these queries: ");
       }
       try {
         // Get CQ Name.
-        Part cqNamePart = m.getPart(startMessagePart + (cqCnt++));
+        Part cqNamePart = clientMessage.getPart(startMessagePart + cqCnt++);
         // Get CQ Op.
-        Part cqOpPart = m.getPart(startMessagePart + (cqCnt++));
-        cqs.put(cqNamePart.getString(), Integer.valueOf(cqOpPart.getInt()));
+        Part cqOpPart = clientMessage.getPart(startMessagePart + cqCnt++);
+        cqs.put(cqNamePart.getString(), cqOpPart.getInt());
 
-        if (str != null) {
-          str.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append("  ");
+        if (sb != null) {
+          sb.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append("  ");
         }
-      } catch (Exception ex) {
+      } catch (Exception ignore) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientUpdater_ERROR_WHILE_PROCESSING_THE_CQ_MESSAGE_PROBLEM_WITH_READING_MESSAGE_FOR_CQ_0,
             cqCnt));
       }
-      if (isDebugEnabled && str != null) {
-        logger.debug(str);
+      if (isDebugEnabled) {
+        logger.debug(sb);
       }
     }
 
-    {
-      CqService cqService = this.cache.getCqService();
-      try {
-        cqService.dispatchCqListeners(cqs, messageType, key, value, delta, qManager, eventId);
-      } catch (Exception ex) {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
-            ex.getMessage()));
-        if (isDebugEnabled) {
-          logger.debug("Failed to invoke CQ Dispatcher.", ex);
-        }
+    CqService cqService = this.cache.getCqService();
+    try {
+      cqService.dispatchCqListeners(cqs, messageType, key, value, delta, this.qManager, eventId);
+    } catch (Exception ex) {
+      logger.warn(LocalizedMessage.create(
+          LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
+          ex.getMessage()));
+      if (isDebugEnabled) {
+        logger.debug("Failed to invoke CQ Dispatcher.", ex);
       }
     }
 
-    return (startMessagePart + numCqParts);
+    return startMessagePart + numCqParts;
   }
 
-  private void handleRegisterInterest(Message m) {
+  private void handleRegisterInterest(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int interestType;
-    byte interestResultPolicy;
-    boolean isDurable;
-    boolean receiveUpdatesAsInvalidates;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the add interest message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received add interest message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part interestTypePart = m.getPart(partCnt++);
-      Part interestResultPolicyPart = m.getPart(partCnt++);
-      Part isDurablePart = m.getPart(partCnt++);
-      Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part interestTypePart = clientMessage.getPart(partCnt++);
+      Part interestResultPolicyPart = clientMessage.getPart(partCnt++);
+      Part isDurablePart = clientMessage.getPart(partCnt++);
+      Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
-      interestType = ((Integer) interestTypePart.getObject()).intValue();
-      interestResultPolicy = ((Byte) interestResultPolicyPart.getObject()).byteValue();
-      isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
-      receiveUpdatesAsInvalidates =
-          ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+      int interestType = (Integer) interestTypePart.getObject();
+      byte interestResultPolicy = (Byte) interestResultPolicyPart.getObject();
+      boolean isDurable = (Boolean) isDurablePart.getObject();
+      boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
 
       // Confirm that region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1375,38 +1379,34 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
-  private void handleUnregisterInterest(Message m) {
+  private void handleUnregisterInterest(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int interestType;
-    boolean isDurable;
-    boolean receiveUpdatesAsInvalidates;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the remove interest message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received remove interest message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part interestTypePart = m.getPart(partCnt++);
-      Part isDurablePart = m.getPart(partCnt++);
-      Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part interestTypePart = clientMessage.getPart(partCnt++);
+      Part isDurablePart = clientMessage.getPart(partCnt++);
+      Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
       // Not reading the eventId part
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
-      interestType = ((Integer) interestTypePart.getObject()).intValue();
-      isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
-      receiveUpdatesAsInvalidates =
-          ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+      int interestType = (Integer) interestTypePart.getObject();
+      boolean isDurable = (Boolean) isDurablePart.getObject();
+      boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
 
       // Confirm that region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1434,14 +1434,17 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
-  private void handleTombstoneOperation(Message msg) {
+  private void handleTombstoneOperation(Message clientMessage) {
     String regionName = "unknown";
+
     try { // not sure why this isn't done by the caller
       int partIdx = 0;
+
       // see ClientTombstoneMessage.getGFE70Message
-      regionName = msg.getPart(partIdx++).getString();
-      int op = msg.getPart(partIdx++).getInt();
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      regionName = clientMessage.getPart(partIdx++).getString();
+      int op = clientMessage.getPart(partIdx++).getInt();
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
       if (region == null) {
         if (!quitting()) {
           if (logger.isDebugEnabled()) {
@@ -1450,24 +1453,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         }
         return;
       }
+
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Received tombstone operation for region {} with operation={}", this,
             region, op);
       }
+
       if (!region.getConcurrencyChecksEnabled()) {
         return;
       }
+
       switch (op) {
         case 0:
           Map<VersionSource, Long> regionGCVersions =
-              (Map<VersionSource, Long>) msg.getPart(partIdx++).getObject();
-          EventID eventID = (EventID) msg.getPart(partIdx++).getObject();
+              (Map<VersionSource, Long>) clientMessage.getPart(partIdx++).getObject();
+          EventID eventID = (EventID) clientMessage.getPart(partIdx++).getObject();
           region.expireTombstones(regionGCVersions, eventID, null);
           break;
+
         case 1:
-          Set<Object> removedKeys = (Set<Object>) msg.getPart(partIdx++).getObject();
+          Set<Object> removedKeys = (Set<Object>) clientMessage.getPart(partIdx++).getObject();
           region.expireTombstoneKeys(removedKeys);
           break;
+
         default:
           throw new IllegalArgumentException("unknown operation type " + op);
       }
@@ -1483,22 +1491,21 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private boolean quitting() {
     if (isInterrupted()) {
-      // Any time an interrupt is thrown at this thread, regard it as a
-      // request to terminate
+      // Any time an interrupt is thrown at this thread, regard it as a request to terminate
       return true;
     }
-    if (!continueProcessing.get()) {
+    if (!this.continueProcessing.get()) {
       // de facto flag indicating we are to stop
       return true;
     }
-    if (cache != null && cache.getCancelCriterion().isCancelInProgress()) {
+    if (this.cache != null && this.cache.getCancelCriterion().isCancelInProgress()) {
       // System is cancelling
       return true;
     }
 
     // The pool stuff is really sick, so it's possible for us to have a distributed
     // system that is not the same as our cache. Check it just in case...
-    if (system.getCancelCriterion().isCancelInProgress()) {
+    if (this.system.getCancelCriterion().isCancelInProgress()) {
       return true;
     }
 
@@ -1520,15 +1527,15 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           this.failedUpdater.join(5000);
         }
       }
-    } catch (InterruptedException ie) {
+    } catch (InterruptedException ignore) {
       gotInterrupted = true;
-      return; // just bail, because I have not done anything yet
+      // just bail, because I have not done anything yet
     } finally {
       if (!gotInterrupted && this.failedUpdater != null) {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CacheClientUpdater_0_HAS_COMPLETED_WAITING_FOR_1,
             new Object[] {this, this.failedUpdater}));
-        failedUpdater = null;
+        this.failedUpdater = null;
       }
     }
   }
@@ -1537,6 +1544,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * Processes messages received from the server.
    * 
    * Only certain types of messages are handled.
+   *
+   * TODO: Method 'processMessages' is too complex to analyze by data flow algorithm
    * 
    * @see MessageType#CLIENT_MARKER
    * @see MessageType#LOCAL_CREATE
@@ -1547,11 +1556,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @see MessageType#CLEAR_REGION
    * @see ClientUpdateMessage
    */
-  protected void processMessages() {
+  private void processMessages() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
-      Part eid = null;
-      Message _message = initializeMessage();
+      Message clientMessage = initializeMessage();
+
       if (quitting()) {
         if (isDebugEnabled) {
           logger.debug("processMessages quitting early because we have stopped");
@@ -1559,11 +1568,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         // our caller calls close which will notify all waiters for our init
         return;
       }
+
       logger.info(LocalizedMessage
           .create(LocalizedStrings.CacheClientUpdater_0_READY_TO_PROCESS_MESSAGES, this));
 
-      while (continueProcessing.get()) {
-        // SystemFailure.checkFailure(); dm will check this
+      while (this.continueProcessing.get()) {
         if (quitting()) {
           if (isDebugEnabled) {
             logger.debug("termination detected");
@@ -1583,12 +1592,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
         try {
           // Read the message
-          _message.recv();
+          clientMessage.recv();
 
           // Wait for the previously failed cache client updater
           // to finish. This will avoid out of order messages.
           waitForFailedUpdater();
-          cache.waitForRegisterInterestsInProgress();
+          this.cache.waitForRegisterInterestsInProgress();
           if (quitting()) {
             if (isDebugEnabled) {
               logger.debug("processMessages quitting before processing message");
@@ -1597,7 +1606,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           }
 
           // If the message is a ping, ignore it
-          if (_message.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
+          if (clientMessage.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
             if (isDebugEnabled) {
               logger.debug("{}: Received ping", this);
             }
@@ -1605,76 +1614,80 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           }
 
           boolean isDeltaSent = false;
-          boolean isCreateOrUpdate = _message.getMessageType() == MessageType.LOCAL_CREATE
-              || _message.getMessageType() == MessageType.LOCAL_UPDATE;
+          boolean isCreateOrUpdate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE
+              || clientMessage.getMessageType() == MessageType.LOCAL_UPDATE;
           if (isCreateOrUpdate) {
-            isDeltaSent = ((Boolean) _message.getPart(2).getObject()).booleanValue();
+            isDeltaSent = (Boolean) clientMessage.getPart(2).getObject();
           }
 
           // extract the eventId and verify if it is a duplicate event
           // if it is a duplicate event, ignore
           // @since GemFire 5.1
-          int numberOfParts = _message.getNumberOfParts();
-          eid = _message.getPart(numberOfParts - 1);
+          int numberOfParts = clientMessage.getNumberOfParts();
+          Part eid = clientMessage.getPart(numberOfParts - 1);
+
           // TODO the message handling methods also deserialized the eventID - inefficient
           EventID eventId = (EventID) eid.getObject();
 
           // no need to verify if the instantiator msg is duplicate or not
-          if (_message.getMessageType() != MessageType.REGISTER_INSTANTIATORS
-              && _message.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
+          if (clientMessage.getMessageType() != MessageType.REGISTER_INSTANTIATORS
+              && clientMessage.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
             if (this.qManager.getState().verifyIfDuplicate(eventId,
                 !(this.isDurableClient || isDeltaSent))) {
               continue;
             }
           }
+
           if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
-            logger.trace(LogMarker.BRIDGE_SERVER,
-                "Processing event with id {}" + eventId.expensiveToString());
+            logger.trace(LogMarker.BRIDGE_SERVER, "Processing event with id {}",
+                eventId.expensiveToString());
           }
+
           this.isOpCompleted = true;
+
           // Process the message
-          switch (_message.getMessageType()) {
+          switch (clientMessage.getMessageType()) {
             case MessageType.LOCAL_CREATE:
             case MessageType.LOCAL_UPDATE:
-              handleUpdate(_message);
+              handleUpdate(clientMessage);
               break;
             case MessageType.LOCAL_INVALIDATE:
-              handleInvalidate(_message);
+              handleInvalidate(clientMessage);
               break;
             case MessageType.LOCAL_DESTROY:
-              handleDestroy(_message);
+              handleDestroy(clientMessage);
               break;
             case MessageType.LOCAL_DESTROY_REGION:
-              handleDestroyRegion(_message);
+              handleDestroyRegion(clientMessage);
               break;
             case MessageType.CLEAR_REGION:
-              handleClearRegion(_message);
+              handleClearRegion(clientMessage);
               break;
             case MessageType.REGISTER_INSTANTIATORS:
-              handleRegisterInstantiator(_message, eventId);
+              handleRegisterInstantiator(clientMessage, eventId);
               break;
             case MessageType.REGISTER_DATASERIALIZERS:
-              handleRegisterDataSerializer(_message, eventId);
+              handleRegisterDataSerializer(clientMessage, eventId);
               break;
             case MessageType.CLIENT_MARKER:
-              handleMarker(_message);
+              handleMarker(clientMessage);
               break;
             case MessageType.INVALIDATE_REGION:
-              handleInvalidateRegion(_message);
+              handleInvalidateRegion(clientMessage);
               break;
             case MessageType.CLIENT_REGISTER_INTEREST:
-              handleRegisterInterest(_message);
+              handleRegisterInterest(clientMessage);
               break;
             case MessageType.CLIENT_UNREGISTER_INTEREST:
-              handleUnregisterInterest(_message);
+              handleUnregisterInterest(clientMessage);
               break;
             case MessageType.TOMBSTONE_OPERATION:
-              handleTombstoneOperation(_message);
+              handleTombstoneOperation(clientMessage);
               break;
             default:
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.CacheClientUpdater_0_RECEIVED_AN_UNSUPPORTED_MESSAGE_TYPE_1,
-                  new Object[] {this, MessageType.getString(_message.getMessageType())}));
+                  new Object[] {this, MessageType.getString(clientMessage.getMessageType())}));
               break;
           }
 
@@ -1689,7 +1702,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           // likely to send pings...
           // and the ClientHealthMonitor will cause a disconnect
 
-        } catch (InterruptedIOException e) {
+        } catch (InterruptedIOException ignore) {
           // Per Sun's support web site, this exception seems to be peculiar
           // to Solaris, and may eventually not even be generated there.
           //
@@ -1697,62 +1710,59 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           // isInterrupted() is false. (How very odd!)
           //
           // We regard it the same as an InterruptedException
-          this.endPointDied = true;
 
-          continueProcessing.set(false);// = false;
+          this.continueProcessing.set(false);
           if (isDebugEnabled) {
             logger.debug("InterruptedIOException");
           }
+
         } catch (IOException e) {
-          this.endPointDied = true;
           // Either the server went away, or we caught a closing condition.
           if (!quitting()) {
             // Server departed; print a message.
-            String message = ": Caught the following exception and will exit: ";
-            String errMessage = e.getMessage();
-            if (errMessage == null) {
-              errMessage = "";
-            }
-            ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-            bo.beforeFailoverByCacheClientUpdater(this.location);
-            eManager.serverCrashed(this.endpoint);
+            ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+            clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+            this.eManager.serverCrashed(this.endpoint);
             if (isDebugEnabled) {
-              logger.debug("" + message + e);
+              logger.debug("Caught the following exception and will exit", e);
             }
           } // !quitting
 
           // In any event, terminate this thread.
-          continueProcessing.set(false);// = false;
+          this.continueProcessing.set(false);
           if (isDebugEnabled) {
             logger.debug("terminated due to IOException");
           }
+
         } catch (Exception e) {
           if (!quitting()) {
-            this.endPointDied = true;
-            ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-            bo.beforeFailoverByCacheClientUpdater(this.location);
-            eManager.serverCrashed(this.endpoint);
+            ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+            clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+            this.eManager.serverCrashed(this.endpoint);
             String message = ": Caught the following exception and will exit: ";
             handleException(message, e);
           }
+
           // In any event, terminate this thread.
-          continueProcessing.set(false);// = false; // force termination
+          this.continueProcessing.set(false);// = false; // force termination
           if (isDebugEnabled) {
             logger.debug("CCU terminated due to Exception");
           }
+
         } finally {
-          _message.clear();
+          clientMessage.clear();
         }
       } // while
+
     } finally {
       if (isDebugEnabled) {
         logger.debug("has stopped and cleaning the helper ..");
       }
-      this.close(); // added to fixes some race conditions associated with 38382
+      close(); // added to fix some race conditions associated with 38382
       // this will make sure that if this thread dies without starting QueueMgr then it will start..
       // 1. above we ignore InterruptedIOException and this thread dies without informing QueueMgr
-      // 2. if there is some other race codition with continueProcessing flag
-      this.qManager.checkEndpoint(this, endpoint);
+      // 2. if there is some other race condition with continueProcessing flag
+      this.qManager.checkEndpoint(this, this.endpoint);
     }
   }
 
@@ -1785,12 +1795,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private Object deserialize(byte[] serializedBytes) {
     Object deserializedObject = serializedBytes;
-    // This is a debugging method so ignore all exceptions like
-    // ClassNotFoundException
+    // This is a debugging method so ignore all exceptions like ClassNotFoundException
     try {
       DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
       deserializedObject = DataSerializer.readObject(dis);
-    } catch (Exception e) {
+    } catch (ClassNotFoundException | IOException ignore) {
     }
     return deserializedObject;
   }
@@ -1799,18 +1808,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @return the local port of our {@link #socket}
    */
   protected int getLocalPort() {
-    return socket.getLocalPort();
+    return this.socket.getLocalPort();
   }
 
+  @Override
   public void onDisconnect(InternalDistributedSystem sys) {
     stopUpdater();
   }
 
-  /**
-   * true if the EndPoint represented by this updater thread has died.
-   */
-  private volatile boolean endPointDied = false;
-
   private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
     if (actualBufferSize < requestedBufferSize) {
       logger.info(LocalizedMessage.create(
@@ -1826,11 +1831,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @since GemFire 5.7
    */
   public static class CCUStats implements MessageStats {
-    // static fields
+
     private static final StatisticsType type;
-    private final static int messagesBeingReceivedId;
-    private final static int messageBytesBeingReceivedId;
-    private final static int receivedBytesId;
+    private static final int messagesBeingReceivedId;
+    private static final int messageBytesBeingReceivedId;
+    private static final int receivedBytesId;
 
     static {
       StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
@@ -1852,7 +1857,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     // instance fields
     private final Statistics stats;
 
-    public CCUStats(DistributedSystem ids, ServerLocation location) {
+    CCUStats(DistributedSystem ids, ServerLocation location) {
       // no need for atomic since only a single thread will be writing these
       this.stats = ids.createStatistics(type, "CacheClientUpdater-" + location);
     }
@@ -1861,25 +1866,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       this.stats.close();
     }
 
+    @Override
     public void incReceivedBytes(long v) {
       this.stats.incLong(receivedBytesId, v);
     }
 
+    @Override
     public void incSentBytes(long v) {
       // noop since we never send messages
     }
 
+    @Override
     public void incMessagesBeingReceived(int bytes) {
-      stats.incInt(messagesBeingReceivedId, 1);
+      this.stats.incInt(messagesBeingReceivedId, 1);
       if (bytes > 0) {
-        stats.incLong(messageBytesBeingReceivedId, bytes);
+        this.stats.incLong(messageBytesBeingReceivedId, bytes);
       }
     }
 
+    @Override
     public void decMessagesBeingReceived(int bytes) {
-      stats.incInt(messagesBeingReceivedId, -1);
+      this.stats.incInt(messagesBeingReceivedId, -1);
       if (bytes > 0) {
-        stats.incLong(messageBytesBeingReceivedId, -bytes);
+        this.stats.incLong(messageBytesBeingReceivedId, -bytes);
       }
     }
 
@@ -1893,7 +1902,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
+  @Override
   public boolean isProcessing() {
-    return continueProcessing.get();
+    return this.continueProcessing.get();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
index be30061..39c2f3a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
@@ -152,7 +152,8 @@ public class ChunkedMessage extends Message {
 
   public void setLastChunkAndNumParts(boolean lastChunk, int numParts) {
     setLastChunk(lastChunk);
-    if (this.serverConnection != null && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
+    if (this.serverConnection != null
+        && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
       // we us e three bits for number of parts in last chunk byte
       // we us e three bits for number of parts in last chunk byte
       byte localLastChunk = (byte) (numParts << 5);
@@ -240,7 +241,8 @@ public class ChunkedMessage extends Message {
     int totalBytesRead = 0;
     do {
       int bytesRead = 0;
-      bytesRead = inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
+      bytesRead =
+          inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
       if (bytesRead == -1) {
         throw new EOFException(
             LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString());

http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index 354ad0f..2ac6fea 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -84,7 +84,8 @@ public class Message {
   // Tentative workaround to avoid OOM stated in #46754.
   public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>();
 
-  public static final String MAX_MESSAGE_SIZE_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size";
+  public static final String MAX_MESSAGE_SIZE_PROPERTY =
+      DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size";
 
   static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
 
@@ -299,8 +300,8 @@ public class Message {
     } else {
       HeapDataOutputStream hdos = new HeapDataOutputStream(str);
       try {
-      this.messageModified = true;
-      part.setPartState(hdos, false);
+        this.messageModified = true;
+        part.setPartState(hdos, false);
       } finally {
         close(hdos);
       }
@@ -309,8 +310,8 @@ public class Message {
   }
 
   /*
-   * Adds a new part to this message that contains a {@code byte} array (as opposed to a
-   * serialized object).
+   * Adds a new part to this message that contains a {@code byte} array (as opposed to a serialized
+   * object).
    *
    * @see #addPart(byte[], boolean)
    */
@@ -378,7 +379,7 @@ public class Message {
     if (this.version.equals(Version.CURRENT)) {
       v = null;
     }
-    
+
     // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources
     // passed to it.
     HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true);
@@ -399,12 +400,12 @@ public class Message {
     if (zipValues) {
       throw new UnsupportedOperationException("zipValues no longer supported");
     }
-    
+
     Version v = this.version;
     if (this.version.equals(Version.CURRENT)) {
       v = null;
     }
-    
+
     HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v);
     try {
       BlobHelper.serializeTo(o, hdos);
@@ -520,7 +521,8 @@ public class Message {
   }
 
   protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
-    // setting second bit of flags byte for client this is not require but this makes all changes easily at client side right now just see this bit and process security header
+    // setting second bit of flags byte for client this is not require but this makes all changes
+    // easily at client side right now just see this bit and process security header
     byte flagsByte = this.flags;
     if (isSecurityHeader) {
       flagsByte |= MESSAGE_HAS_SECURE_PART;
@@ -529,7 +531,7 @@ public class Message {
       flagsByte |= MESSAGE_IS_RETRY;
     }
     getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts)
-                   .putInt(this.transactionId).put(flagsByte);
+        .putInt(this.transactionId).put(flagsByte);
   }
 
   protected Part getSecurityPart() {
@@ -601,7 +603,7 @@ public class Message {
 
         if (msgLen > this.maxMessageSize) {
           throw new MessageTooLargeException("Message size (" + msgLen
-                                             + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")");
+              + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")");
         }
 
         commBuffer.clear();
@@ -673,7 +675,7 @@ public class Message {
   void fetchHeader() throws IOException {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    
+
     // messageType is invalidated here and can be used as an indicator
     // of problems reading the message
     this.messageType = MessageType.INVALID;
@@ -693,7 +695,7 @@ public class Message {
         }
       } while (cb.remaining() > 0);
       cb.flip();
-      
+
     } else {
       int hdr = 0;
       do {
@@ -728,7 +730,7 @@ public class Message {
       throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER
           .toLocalizedString(type));
     }
-    
+
     int timeToWait = 0;
     if (this.serverConnection != null) {
       // Keep track of the fact that a message is being processed.
@@ -736,7 +738,7 @@ public class Message {
       timeToWait = this.serverConnection.getClientReadTimeout();
     }
     this.readHeader = true;
-    
+
     if (this.messageLimiter != null) {
       for (;;) {
         this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
@@ -764,15 +766,13 @@ public class Message {
         }
       } // for
     }
-    
+
     if (len > 0) {
       if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
         throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1
-            .toLocalizedString(new Object[] {
-              len, this.maxIncomingMessageLength
-            }));
+            .toLocalizedString(new Object[] {len, this.maxIncomingMessageLength}));
       }
-      
+
       if (this.dataLimiter != null) {
         for (;;) {
           if (this.serverConnection != null) {
@@ -840,7 +840,7 @@ public class Message {
     if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) {
       throw new IOException(
           LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT
-              .toLocalizedString(new Object[] { len, numParts }));
+              .toLocalizedString(new Object[] {len, numParts}));
     }
 
     Integer msgType = MESSAGE_TYPE.get();
@@ -854,7 +854,7 @@ public class Message {
             + MessageType.getString(msgType) + " operation.");
       }
     }
-    
+
     setNumberOfParts(numParts);
     if (numParts <= 0) {
       return;
@@ -872,7 +872,8 @@ public class Message {
     int readSecurePart = checkAndSetSecurityPart();
 
     int bytesRemaining = len;
-    for (int i = 0; i < numParts + readSecurePart || readSecurePart == 1 && cb.remaining() > 0; i++) {
+    for (int i = 0; i < numParts + readSecurePart
+        || readSecurePart == 1 && cb.remaining() > 0; i++) {
       int bytesReadThisTime = readPartChunk(bytesRemaining);
       bytesRemaining -= bytesReadThisTime;
 
@@ -887,7 +888,7 @@ public class Message {
       int partLen = cb.getInt();
       byte partType = cb.get();
       byte[] partBytes = null;
-      
+
       if (partLen > 0) {
         partBytes = new byte[partLen];
         int alreadyReadBytes = cb.remaining();
@@ -897,7 +898,7 @@ public class Message {
           }
           cb.get(partBytes, 0, alreadyReadBytes);
         }
-        
+
         // now we need to read partLen - alreadyReadBytes off the wire
         int off = alreadyReadBytes;
         int remaining = partLen - off;
@@ -965,20 +966,20 @@ public class Message {
       // we already have the next part header in commBuffer so just return
       return 0;
     }
-    
+
     if (commBuffer.position() != 0) {
       commBuffer.compact();
     } else {
       commBuffer.position(commBuffer.limit());
       commBuffer.limit(commBuffer.capacity());
     }
-    
+
     if (this.serverConnection != null) {
       // Keep track of the fact that we are making progress
       this.serverConnection.updateProcessingMessage();
     }
     int bytesRead = 0;
-    
+
     if (this.socketChannel != null) {
       int remaining = commBuffer.remaining();
       if (remaining > bytesRemaining) {
@@ -1006,7 +1007,7 @@ public class Message {
         bytesToRead = bytesRemaining;
       }
       int pos = commBuffer.position();
-      
+
       while (bytesToRead > 0) {
         int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead);
         if (res != -1) {
@@ -1022,7 +1023,7 @@ public class Message {
                   .toLocalizedString());
         }
       }
-      
+
       commBuffer.position(pos);
     }
     commBuffer.flip();

http://git-wip-us.apache.org/repos/asf/geode/blob/f13ceee6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index dfda14f..485ccae 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -723,7 +723,10 @@ public class ServerConnection implements Runnable {
     ThreadState threadState = null;
     try {
       if (msg != null) {
-        // Since this thread is not interrupted when the cache server is shutdown, test again after a message has been read. This is a bit of a hack. I think this thread should be interrupted, but currently AcceptorImpl doesn't keep track of the threads that it launches.
+        // Since this thread is not interrupted when the cache server is shutdown, test again after
+        // a message has been read. This is a bit of a hack. I think this thread should be
+        // interrupted, but currently AcceptorImpl doesn't keep track of the threads that it
+        // launches.
         if (!this.processMessages || (crHelper.isShutdown())) {
           if (logger.isDebugEnabled()) {
             logger.debug("{} ignoring message of type {} from client {} due to shutdown.",


[6/6] geode git commit: Cleanup HARegionQueueJUnitTest and BlockingHARegionQueueJUnitTest

Posted by kl...@apache.org.
Cleanup HARegionQueueJUnitTest and BlockingHARegionQueueJUnitTest


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/4d4305de
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/4d4305de
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/4d4305de

Branch: refs/heads/feature/GEODE-2632-16
Commit: 4d4305dec61b45b05af38fcd6dd4c4e39c2f29c9
Parents: f13ceee
Author: Kirk Lund <kl...@apache.org>
Authored: Mon May 22 17:21:04 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Mon May 22 17:21:04 2017 -0700

----------------------------------------------------------------------
 .../ha/BlockingHARegionQueueJUnitTest.java      |  169 +-
 .../cache/ha/HARegionQueueJUnitTest.java        | 2256 ++++++++----------
 2 files changed, 1100 insertions(+), 1325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/4d4305de/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
index 39aa1e6..adbd7c0 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
@@ -14,166 +14,141 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.awaitility.Awaitility;
-
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.CacheException;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
  * Test runs all tests of HARegionQueueJUnitTest using BlockingHARegionQueue instead of
  * HARegionQueue.
- * 
- * 
  */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class BlockingHARegionQueueJUnitTest extends HARegionQueueJUnitTest {
 
-  /**
-   * Creates Blocking HA region-queue object
-   * 
-   * @return Blocking HA region-queue object
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
-   */
-  protected HARegionQueue createHARegionQueue(String name)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue =
-        HARegionQueue.getHARegionQueueInstance(name, cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
-    return regionqueue;
-  }
-
-  /**
-   * Creates Blocking HA region-queue object
-   * 
-   * @return Blocking HA region-queue object
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
-   */
-  protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
-        HARegionQueue.BLOCKING_HA_QUEUE, false);
-    return regionqueue;
+  @Override
+  protected int queueType() {
+    return HARegionQueue.BLOCKING_HA_QUEUE;
   }
 
   /**
    * Tests the effect of a put which is blocked because of capacity constraint & subsequent passage
    * because of take operation
-   * 
    */
   @Test
-  public void testBlockingPutAndTake()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndTake() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
-    final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", hrqa);
-    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only.
+
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (InterruptedException e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
     Conflatable conf = (Conflatable) hrq.take();
-    assertNotNull(conf);
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
+    assertThat(conf, notNullValue());
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
   }
 
   /**
    * Test Scenario : BlockingQueue capacity is 1. The first put should be successful. The second put
    * should block till a peek/remove happens.
-   * 
    */
   @Test
-  public void testBlockingPutAndPeekRemove()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndPeekRemove() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
-    final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndPeekRemove", hrqa);
+
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa);
     hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (Exception e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
     Conflatable conf = (Conflatable) hrq.peek();
-    assertNotNull(conf);
+    assertThat(conf, notNullValue());
+
     hrq.remove();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
-    assertFalse("Exception occurred in put-thread", encounteredException);
 
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
   }
 
   /**
    * Test Scenario :Blocking Queue capacity is 1. The first put should be successful.The second put
    * should block till the first put expires.
-   * 
+   * <p>
+   * fix for 40314 - capacity constraint is checked for primary only and
+   * expiry is not applicable on primary so marking this test as invalid.
    */
-  // fix for 40314 - capacity constraint is checked for primary only and
-  // expiry is not applicable on primary so marking this test as invalid.
-  @Ignore
   @Test
-  public void testBlockingPutAndExpiry()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndExpiry() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
     hrqa.setExpiryTime(1);
-    final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndExpiry", hrqa);
+
+    HARegionQueue hrq = this.createHARegionQueue(this.testName.getMethodName(), hrqa);
 
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
-    long start = System.currentTimeMillis();
+
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (Exception e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
-    waitAtLeast(1000, start, () -> {
-      assertFalse("Put-thread blocked unexpectedly", t1.isAlive());
-    });
-    assertFalse("Exception occurred in put-thread", encounteredException);
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get());
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive());
   }
 }


[5/6] geode git commit: Cleanup HARegionQueueJUnitTest and BlockingHARegionQueueJUnitTest

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/geode/blob/4d4305de/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 4028ab3..b4ecd33 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -15,6 +15,8 @@
 package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.number.OrderingComparison.*;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -23,101 +25,74 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.awaitility.Awaitility;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TestName;
 
-import org.apache.geode.LogWriter;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
  * This is a test for the APIs of a HARegionQueue and verifies that the head, tail and size counters
  * are updated properly.
+ *
+ * TODO: need to rewrite a bunch of tests in HARegionQueueJUnitTest
  */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class HARegionQueueJUnitTest {
 
-  /** The cache instance */
-  protected InternalCache cache = null;
+  /** total number of threads doing put operations */
+  private static final int TOTAL_PUT_THREADS = 10;
 
-  /** Logger for this test */
-  protected LogWriter logger;
+  private static HARegionQueue hrqForTestSafeConflationRemoval;
+  private static List list1;
 
-  /** The <code>RegionQueue</code> instance */
-  protected HARegionQueue rq;
+  protected InternalCache cache;
+  private HARegionQueue haRegionQueue;
 
-  /** total number of threads doing put operations */
-  private static final int TOTAL_PUT_THREADS = 10;
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
 
-  boolean expiryCalled = false;
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
 
-  volatile boolean encounteredException = false;
-  boolean allowExpiryToProceed = false;
-  boolean complete = false;
+  @Rule
+  public TestName testName = new TestName();
 
   @Before
   public void setUp() throws Exception {
-    cache = createCache();
-    logger = cache.getLogger();
-    encounteredException = false;
+    this.cache = createCache();
   }
 
   @After
   public void tearDown() throws Exception {
-    cache.close();
-  }
-
-  /**
-   * Creates the cache instance for the test
-   */
-  private InternalCache createCache() throws CacheException {
-    return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create();
-  }
-
-  /**
-   * Creates HA region-queue object
-   */
-  private HARegionQueue createHARegionQueue(String name)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache,
-        HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
-    return regionqueue;
-  }
-
-  /**
-   * Creates region-queue object
-   */
-  private HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
-        HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
-    return regionqueue;
+    this.cache.close();
+    hrqForTestSafeConflationRemoval = null;
   }
 
   /**
@@ -129,14 +104,10 @@ public class HARegionQueueJUnitTest {
    */
   @Test
   public void testQueuePutWithoutConflation() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation BEGIN");
-
-    rq = createHARegionQueue("testOfferNoConflation");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
     createAndRunProducers(false, false, false, putPerProducer);
-    assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation END");
+    assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
   }
 
   /**
@@ -149,14 +120,10 @@ public class HARegionQueueJUnitTest {
    */
   @Test
   public void testQueuePutWithConflation() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation BEGIN");
-
-    rq = createHARegionQueue("testOfferConflation");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
     createAndRunProducers(true, false, true, putPerProducer);
-    assertEquals(putPerProducer, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation END");
+    assertThat(this.haRegionQueue.size(), is(putPerProducer));
   }
 
   /**
@@ -166,319 +133,134 @@ public class HARegionQueueJUnitTest {
    * 3)Wait till all put-threads complete their job <br>
    * 4)verify that the size of the queue is equal to the total number of puts done by one thread (as
    * rest of them will be duplicates and hence will be replaced)
-   *
-   * TODO:Dinesh : Work on optimizing the handling of receiving duplicate events
    */
   @Test
   public void testQueuePutWithDuplicates() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates BEGIN");
-
-    rq = createHARegionQueue("testQueuePutWithDuplicates");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
-    // createAndRunProducers(false, true, true, putPerProducer);
-    /* Suyog: Only one thread can enter DACE at a time */
     createAndRunProducers(false, false, true, putPerProducer);
-    assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates END");
-  }
-
-  /**
-   * Creates and runs the put threads which will create the conflatable objects and add them to the
-   * queue
-   * 
-   * @param generateSameKeys - if all the producers need to put objects with same set of keys
-   *        (needed for conflation testing)
-   * @param generateSameIds - if all the producers need to put objects with same set of ids (needed
-   *        for duplicates testing)
-   * @param conflationEnabled - true if all producers need to put objects with conflation enabled,
-   *        false otherwise.
-   * @param putPerProducer - number of objects offered to the queue by each producer
-   * @throws Exception - thrown if any problem occurs in test execution
-   */
-  private void createAndRunProducers(boolean generateSameKeys, boolean generateSameIds,
-      boolean conflationEnabled, int putPerProducer) throws Exception {
-    Producer[] putThreads = new Producer[TOTAL_PUT_THREADS];
-
-    int i = 0;
-
-    // Create the put-threads, each generating same/different set of ids/keys as
-    // per the parameters
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      String keyPrefix = null;
-      long startId;
-      if (generateSameKeys) {
-        keyPrefix = "key";
-      } else {
-        keyPrefix = i + "key";
-      }
-      if (generateSameIds) {
-        startId = 1;
-      } else {
-        startId = i * 100000;
-      }
-      putThreads[i] =
-          new Producer("Producer-" + i, keyPrefix, startId, putPerProducer, conflationEnabled);
-    }
-
-    // start the put-threads
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      putThreads[i].start();
-    }
-
-    // call join on the put-threads so that this thread waits till they complete
-    // before doing verfication
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      ThreadUtils.join(putThreads[i], 30 * 1000);
-    }
-    assertFalse(encounteredException);
+    assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
   }
 
   /*
    * Test method for 'org.apache.geode.internal.cache.ha.HARegionQueue.addDispatchedMessage(Object)'
    */
   @Test
-  public void testAddDispatchedMessageObject() {
-    try {
-      // HARegionQueue haRegionQueue = new HARegionQueue("testing", cache);
-      HARegionQueue haRegionQueue = createHARegionQueue("testing");
-      assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
-      // TODO:
-
-      haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
-      haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
+  public void testAddDispatchedMessageObject() throws Exception {
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
+    assertThat(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
 
-      assertTrue(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
-      // HARegionQueue.getDispatchedMessagesMapForTesting().clear();
+    this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
+    this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
 
-    } catch (Exception e) {
-      throw new AssertionError("Test encountered an exception due to ", e);
-    }
+    assertThat(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
   }
 
   /**
    * tests the blocking peek functionality of BlockingHARegionQueue
    */
   @Test
-  public void testBlockQueue() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final HARegionQueue bQ = HARegionQueue.getHARegionQueueInstance("testing", cache,
-          HARegionQueue.BLOCKING_HA_QUEUE, false);
-      Thread[] threads = new Thread[10];
-      final CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
-      for (int i = 0; i < threads.length; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              barrier.await();
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockQueue() throws Exception {
+    HARegionQueue regionQueue = HARegionQueue.getHARegionQueueInstance(this.testName.getMethodName(), this.cache,
+        HARegionQueue.BLOCKING_HA_QUEUE, false);
+    Thread[] threads = new Thread[10];
+    int threadsLength = threads.length;
+    CyclicBarrier barrier = new CyclicBarrier(threadsLength + 1);
+
+    for (int i = 0; i < threadsLength; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            barrier.await();
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError("Failed :  failed since object was null and was not expected to be null"));
+            }
+            long totalTime = System.currentTimeMillis() - startTime;
 
-              if (totalTime < 2000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 2000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+            if (totalTime < 2000) {
+              errorCollector.addError(new AssertionError(" Failed :  Expected time to be greater than 2000 but it is not so "));
             }
+          } catch (Exception e) {
+            errorCollector.addError(e);
           }
-        };
-
-      }
-
-      for (int k = 0; k < threads.length; k++) {
-        threads[k].start();
-      }
-      barrier.await();
-      Thread.sleep(5000);
-
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      bQ.put(new ConflatableObject("key", "value", id, false, "testing"));
-
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < threads.length; k++) {
-        ThreadUtils.join(threads[k], 60 * 1000);
-      }
-
-      long totalTime = System.currentTimeMillis() - startTime;
-
-      if (totalTime >= 60000) {
-        fail(" Test taken too long ");
-      }
-
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
-
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+        }
+      };
     }
-  }
-
-  private static volatile int counter = 0;
-
-  protected boolean exceptionInThread = false;
-
-  protected boolean testFailed = false;
-
-  protected StringBuffer message = new StringBuffer();
-
-  protected Exception exception = null;
-
-  private synchronized int getCounter() {
-    return ++counter;
-  }
-
-  /**
-   * Thread to perform PUTs into the queue
-   */
-  class Producer extends Thread {
-    /** total number of puts by this thread */
-    long totalPuts = 0;
-
-    /** sleep between successive puts */
-    long sleeptime = 10;
-
-    /** prefix to keys of all objects put by this thread */
-    String keyPrefix;
-
-    /** startingId for sequence-ids of all objects put by this thread */
-    long startingId;
 
-    /** name of this producer thread */
-    String producerName;
+    for (Thread thread1 : threads) {
+      thread1.start();
+    }
+    
+    barrier.await();
+    
+    Thread.sleep(5000);
 
-    /**
-     * boolean to indicate whether this thread should create conflation enabled entries
-     */
-    boolean createConflatables;
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    regionQueue.put(new ConflatableObject("key", "value", id, false, this.testName.getMethodName()));
 
-    /**
-     * Constructor
-     * 
-     * @param name - name for this thread
-     * @param keyPrefix - prefix to keys of all objects put by this thread
-     * @param startingId - startingId for sequence-ids of all objects put by this thread
-     * @param totalPuts total number of puts by this thread
-     * @param createConflatableEvents - boolean to indicate whether this thread should create
-     *        conflation enabled entries
-     */
-    Producer(String name, String keyPrefix, long startingId, long totalPuts,
-        boolean createConflatableEvents) {
-      super(name);
-      this.producerName = name;
-      this.keyPrefix = keyPrefix;
-      this.startingId = startingId;
-      this.totalPuts = totalPuts;
-      this.createConflatables = createConflatableEvents;
-      setDaemon(true);
+    long startTime = System.currentTimeMillis();
+    for (Thread thread : threads) {
+      ThreadUtils.join(thread, 60 * 1000);
     }
 
-    /** Create Conflatable objects and put them into the Queue. */
-    @Override
-    public void run() {
-      if (producerName == null) {
-        producerName = Thread.currentThread().getName();
-      }
-      for (long i = 0; i < totalPuts; i++) {
-        String REGION_NAME = "test";
-        try {
-          ConflatableObject event = new ConflatableObject(keyPrefix + i, "val" + i,
-              new EventID(new byte[] {1}, startingId, startingId + i), createConflatables,
-              REGION_NAME);
-
-          logger.fine("putting for key =  " + keyPrefix + i);
-          rq.put(event);
-          Thread.sleep(sleeptime);
-        } catch (VirtualMachineError e) {
-          SystemFailure.initiateFailure(e);
-          throw e;
-        } catch (Throwable e) {
-          logger.severe("Exception while running Producer;continue running.", e);
-          encounteredException = true;
-          break;
-        }
-      }
-      logger.info(producerName + " :  Puts completed");
+    long totalTime = System.currentTimeMillis() - startTime;
+
+    if (totalTime >= 60000) {
+      fail(" Test taken too long ");
     }
   }
 
   /**
-   * tests whether expiry of entry in the regin queue occurs as expected
+   * tests whether expiry of entry in the region queue occurs as expected
    */
   @Test
-  public void testExpiryPositive()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testExpiryPositive() throws Exception {
     HARegionQueueAttributes haa = new HARegionQueueAttributes();
     haa.setExpiryTime(1);
-    HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
     long start = System.currentTimeMillis();
-    regionqueue.put(
-        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
-    Map map = (Map) regionqueue.getConflationMapForTesting().get("testing");
+
+    regionQueue.put(
+        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, this.testName.getMethodName()));
+
+    Map map = (Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName());
     waitAtLeast(1000, start, () -> {
-      assertEquals(Collections.EMPTY_MAP, map);
-      assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
+      assertThat(map, is(Collections.emptyMap()));
+      assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
     });
   }
 
   /**
-   * Wait until a given runnable stops throwing exceptions. It should take at least
-   * minimumElapsedTime after the supplied start time to happen.
-   *
-   * This is useful for validating that an entry doesn't expire until a certain amount of time has
-   * passed
-   */
-  protected void waitAtLeast(final int minimumElapsedTIme, final long start,
-      final Runnable runnable) {
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(runnable);
-    long elapsed = System.currentTimeMillis() - start;
-    assertTrue(elapsed >= minimumElapsedTIme);
-  }
-
-  /**
    * tests whether expiry of a conflated entry in the region queue occurs as expected
    */
   @Test
-  public void testExpiryPositiveWithConflation()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testExpiryPositiveWithConflation() throws Exception {
     HARegionQueueAttributes haa = new HARegionQueueAttributes();
     haa.setExpiryTime(1);
-    HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
     long start = System.currentTimeMillis();
-    regionqueue.put(
-        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
-    regionqueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
-        true, "testing"));
-    assertTrue(
-        " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
-        !(regionqueue.size() == 0));
-    assertTrue(
-        " Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ",
-        !(regionqueue.getAvalaibleIds().size() == 0));
-    assertTrue(
-        " Expected conflation map size not  to be zero since expiry time has not been exceeded but it is not so "
-            + ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))),
-        !((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key")) == null));
-    assertTrue(
-        " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
-        !(regionqueue.getEventsMapForTesting().size() == 0));
+
+    regionQueue.put(
+        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, this.testName.getMethodName()));
+
+    regionQueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
+        true, this.testName.getMethodName()));
+
+    assertThat(" Expected region size not to be zero since expiry time has not been exceeded but it is not so ", !regionQueue.isEmpty(), is(true));
+    assertThat(" Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ", !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+    assertThat(" Expected conflation map size not  to be zero since expiry time has not been exceeded but it is not so " + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).get("key"), ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).get("key"), not(sameInstance(null)));
+    assertThat(" Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
 
     waitAtLeast(1000, start, () -> {
-      assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
-      assertEquals(Collections.EMPTY_SET, regionqueue.getAvalaibleIds());
-      assertEquals(Collections.EMPTY_MAP, regionqueue.getConflationMapForTesting().get("testing"));
-      assertEquals(Collections.EMPTY_MAP, regionqueue.getEventsMapForTesting());
+      assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
+      assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet()));
+      assertThat(regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()), is(Collections.emptyMap()));
+      assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap()));
     });
   }
 
@@ -486,38 +268,29 @@ public class HARegionQueueJUnitTest {
    * tests a ThreadId not being expired if it was updated
    */
   @Test
-  public void testNoExpiryOfThreadId() {
-    try {
-      HARegionQueueAttributes haa = new HARegionQueueAttributes();
-      haa.setExpiryTime(45);
-      // RegionQueue regionqueue = new HARegionQueue("testing", cache, haa);
-      HARegionQueue regionqueue = createHARegionQueue("testing", haa);
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
-      EventID ev2 = new EventID(new byte[] {1}, 1, 2);
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
-      Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
-      regionqueue.put(cf1);
-      final long tailKey = regionqueue.tailKey.get();
-      regionqueue.put(cf2);
-      // Invalidate will trigger the expiration of the entry
-      // See HARegionQueue.createCacheListenerForHARegion
-      regionqueue.getRegion().invalidate(tailKey);
-      assertTrue(
-          " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
-          !(regionqueue.size() == 0));
-      assertTrue(" Expected the available id's size not  to have counter 1 but it has ",
-          !(regionqueue.getAvalaibleIds().contains(new Long(1))));
-      assertTrue(" Expected the available id's size to have counter 2 but it does not have ",
-          (regionqueue.getAvalaibleIds().contains(new Long(2))));
-      assertTrue(" Expected eventID map not to have the first event, but it has",
-          !(regionqueue.getCurrentCounterSet(ev1).contains(new Long(1))));
-      assertTrue(" Expected eventID map to have the second event, but it does not",
-          (regionqueue.getCurrentCounterSet(ev2).contains(new Long(2))));
-    }
-
-    catch (Exception e) {
-      throw new AssertionError("test failed due to ", e);
-    }
+  public void testNoExpiryOfThreadId() throws Exception {
+    HARegionQueueAttributes haa = new HARegionQueueAttributes();
+    haa.setExpiryTime(45);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    EventID ev2 = new EventID(new byte[] {1}, 1, 2);
+    Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+
+    regionQueue.put(cf1);
+    long tailKey = regionQueue.tailKey.get();
+    regionQueue.put(cf2);
+
+    // Invalidate will trigger the expiration of the entry
+    // See HARegionQueue.createCacheListenerForHARegion
+    regionQueue.getRegion().invalidate(tailKey);
+
+    assertThat(" Expected region size not to be zero since expiry time has not been exceeded but it is not so ", !regionQueue.isEmpty(), is(true));
+    assertThat(" Expected the available id's size not  to have counter 1 but it has ", !regionQueue.getAvalaibleIds().contains(1L), is(true));
+    assertThat(" Expected the available id's size to have counter 2 but it does not have ", regionQueue.getAvalaibleIds().contains(2L), is(true));
+    assertThat(" Expected eventID map not to have the first event, but it has", !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true));
+    assertThat(" Expected eventID map to have the second event, but it does not", regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
   }
 
   /**
@@ -525,66 +298,51 @@ public class HARegionQueueJUnitTest {
    * being put in the queue
    */
   @Test
-  public void testQRMComingBeforeLocalPut() {
-    try {
-      // RegionQueue regionqueue = new HARegionQueue("testing", cache);
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      regionqueue.removeDispatchedEvents(id);
-      regionqueue.put(new ConflatableObject("key", "value", id, true, "testing"));
-      assertTrue(" Expected key to be null since QRM for the message id had already arrived ",
-          !regionqueue.getRegion().containsKey(new Long(1)));
-    } catch (Exception e) {
-      throw new AssertionError("test failed due to ", e);
-    }
+  public void testQRMComingBeforeLocalPut() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+
+    regionQueue.removeDispatchedEvents(id);
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
+
+    assertThat(" Expected key to be null since QRM for the message id had already arrived ", !regionQueue.getRegion().containsKey(1L), is(true));
   }
 
   /**
    * test verifies correct expiry of ThreadIdentifier in the HARQ if no corresponding put comes
    */
   @Test
-  public void testOnlyQRMComing() throws InterruptedException, IOException, ClassNotFoundException {
+  public void testOnlyQRMComing() throws Exception {
     HARegionQueueAttributes harqAttr = new HARegionQueueAttributes();
     harqAttr.setExpiryTime(1);
-    // RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr);
-    HARegionQueue regionqueue = createHARegionQueue("testing", harqAttr);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), harqAttr);
     EventID id = new EventID(new byte[] {1}, 1, 1);
     long start = System.currentTimeMillis();
-    regionqueue.removeDispatchedEvents(id);
-    assertTrue(" Expected testingID to be present since only QRM achieved ",
-        regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)));
+
+    regionQueue.removeDispatchedEvents(id);
+
+    assertThat(" Expected testingID to be present since only QRM achieved ", regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] { 1 }, 1)), is(true));
+
     waitAtLeast(1000, start,
-        () -> assertTrue(
-            " Expected testingID not to be present since it should have expired after 2.5 seconds",
-            !regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1))));
+        () -> assertThat(" Expected testingID not to be present since it should have expired after 2.5 seconds", !regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] { 1 }, 1)), is(true)));
   }
 
   /**
    * test all relevant data structures are updated on a local put
    */
   @Test
-  public void testPutPath() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      Conflatable cf =
-          new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing");
-      regionqueue.put(cf);
-      assertTrue(" Expected region peek to return cf but it is not so ",
-          (regionqueue.peek().equals(cf)));
-      assertTrue(
-          " Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ",
-          !(regionqueue.getAvalaibleIds().size() == 0));
-      assertTrue(
-          " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
-          ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))
-              .equals(new Long(1))));
-      assertTrue(
-          " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
-          !(regionqueue.getEventsMapForTesting().size() == 0));
+  public void testPutPath() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    Conflatable cf =
+        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, this.testName.getMethodName());
 
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
-    }
+    regionQueue.put(cf);
+
+    assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf));
+    assertThat(" Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ", !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+    assertThat(" Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ", ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).get("key"), is(1L));
+    assertThat(" Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
   }
 
   /**
@@ -592,58 +350,60 @@ public class HARegionQueueJUnitTest {
    * there - verify the next five entries and their relevant data is present
    */
   @Test
-  public void testQRMDispatch() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      Conflatable[] cf = new Conflatable[10];
-      // put 10 conflatable objects
-      for (int i = 0; i < 10; i++) {
-        cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
-            "testing");
-        regionqueue.put(cf[i]);
-      }
-      // remove the first 5 by giving the right sequence id
-      regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
-      // verify 1-5 not in region
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
-      }
-      // verify 6-10 still in region queue
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getRegion().containsKey(new Long(i)));
-      }
-      // verify 1-5 not in conflation map
-      for (long i = 0; i < 5; i++) {
-        assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
-            .containsKey("key" + i));
-      }
-      // verify 6-10 in conflation map
-      for (long i = 5; i < 10; i++) {
-        assertTrue(
-            ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
-      }
+  public void testQRMDispatch() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    Conflatable[] cf = new Conflatable[10];
+
+    // put 10 conflatable objects
+    for (int i = 0; i < 10; i++) {
+      cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
+        this.testName.getMethodName());
+      regionQueue.put(cf[i]);
+    }
 
-      EventID eid = new EventID(new byte[] {1}, 1, 6);
-      // verify 1-5 not in eventMap
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
-      // verify 6-10 in event Map
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
+    // remove the first 5 by giving the right sequence id
+    regionQueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
 
-      // verify 1-5 not in available Id's map
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
+    // verify 1-5 not in region
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getRegion().containsKey((long)i), is(true));
+    }
 
-      // verify 6-10 in available id's map
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    // verify 6-10 still in region queue
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getRegion().containsKey((long)i), is(true));
+    }
+
+    // verify 1-5 not in conflation map
+    for (int i = 0; i < 5; i++) {
+      assertThat(!((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).containsKey("key" + i), is(true));
+    }
+
+    // verify 6-10 in conflation map
+    for (int i = 5; i < 10; i++) {
+      assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).containsKey("key" + i), is(true));
+    }
+
+    EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+    // verify 1-5 not in eventMap
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long)i), is(true));
+    }
+
+    // verify 6-10 in event Map
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getCurrentCounterSet(eid).contains((long)i), is(true));
+    }
+
+    // verify 1-5 not in available Id's map
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getAvalaibleIds().contains((long)i), is(true));
+    }
+
+    // verify 6-10 in available id's map
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getAvalaibleIds().contains((long)i), is(true));
     }
   }
 
@@ -652,68 +412,69 @@ public class HARegionQueueJUnitTest {
    * 1-7 not there - verify data for 8-10 is there
    */
   @Test
-  public void testQRMBeforePut() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
+  public void testQRMBeforePut() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
 
-      EventID[] ids = new EventID[10];
+    EventID[] ids = new EventID[10];
 
-      for (int i = 0; i < 10; i++) {
-        ids[i] = new EventID(new byte[] {1}, 1, i);
-      }
+    for (int i = 0; i < 10; i++) {
+      ids[i] = new EventID(new byte[] {1}, 1, i);
+    }
 
-      // first get the qrm message for the seventh id
-      regionqueue.removeDispatchedEvents(ids[6]);
-      Conflatable[] cf = new Conflatable[10];
-      // put 10 conflatable objects
-      for (int i = 0; i < 10; i++) {
-        cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, "testing");
-        regionqueue.put(cf[i]);
-      }
+    // first get the qrm message for the seventh id
+    regionQueue.removeDispatchedEvents(ids[6]);
+    Conflatable[] cf = new Conflatable[10];
 
-      // verify 1-7 not in region
-      Set values = (Set) regionqueue.getRegion().values();
-      for (int i = 0; i < 7; i++) {
-        System.out.println(i);
-        assertTrue(!values.contains(cf[i]));
-      }
-      // verify 8-10 still in region queue
-      for (int i = 7; i < 10; i++) {
-        System.out.println(i);
-        assertTrue(values.contains(cf[i]));
-      }
-      // verify 1-8 not in conflation map
-      for (long i = 0; i < 7; i++) {
-        assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
-            .containsKey("key" + i));
-      }
-      // verify 8-10 in conflation map
-      for (long i = 7; i < 10; i++) {
-        assertTrue(
-            ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
-      }
+    // put 10 conflatable objects
+    for (int i = 0; i < 10; i++) {
+      cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, this.testName.getMethodName());
+      regionQueue.put(cf[i]);
+    }
 
-      EventID eid = new EventID(new byte[] {1}, 1, 6);
-      // verify 1-7 not in eventMap
-      for (long i = 4; i < 11; i++) {
-        assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
-      // verify 8-10 in event Map
-      for (long i = 1; i < 4; i++) {
-        assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
+    // verify 1-7 not in region
+    Set values = (Set) regionQueue.getRegion().values();
 
-      // verify 1-7 not in available Id's map
-      for (long i = 4; i < 11; i++) {
-        assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
+    for (int i = 0; i < 7; i++) {
+      System.out.println(i);
+      assertThat(!values.contains(cf[i]), is(true));
+    }
 
-      // verify 8-10 in available id's map
-      for (long i = 1; i < 4; i++) {
-        assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    // verify 8-10 still in region queue
+    for (int i = 7; i < 10; i++) {
+      System.out.println(i);
+      assertThat(values.contains(cf[i]), is(true));
+    }
+
+    // verify 1-8 not in conflation map
+    for (int i = 0; i < 7; i++) {
+      assertThat(!((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).containsKey("key" + i), is(true));
+    }
+
+    // verify 8-10 in conflation map
+    for (int i = 7; i < 10; i++) {
+      assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())).containsKey("key" + i), is(true));
+    }
+
+    EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+    // verify 1-7 not in eventMap
+    for (int i = 4; i < 11; i++) {
+      assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long)i), is(true));
+    }
+
+    // verify 8-10 in event Map
+    for (int i = 1; i < 4; i++) {
+      assertThat(regionQueue.getCurrentCounterSet(eid).contains((long)i), is(true));
+    }
+
+    // verify 1-7 not in available Id's map
+    for (int i = 4; i < 11; i++) {
+      assertThat(!regionQueue.getAvalaibleIds().contains((long)i), is(true));
+    }
+
+    // verify 8-10 in available id's map
+    for (int i = 1; i < 4; i++) {
+      assertThat(regionQueue.getAvalaibleIds().contains((long)i), is(true));
     }
   }
 
@@ -721,33 +482,31 @@ public class HARegionQueueJUnitTest {
    * test to verify conflation happens as expected
    */
   @Test
-  public void testConflation() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
-      EventID ev2 = new EventID(new byte[] {1}, 2, 2);
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
-      Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
-      regionqueue.put(cf1);
-      Map conflationMap = regionqueue.getConflationMapForTesting();
-      assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(1)));
-      regionqueue.put(cf2);
-      // verify the conflation map has recorded the new key
-      assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(2)));
-      // the old key should not be present
-      assertTrue(!regionqueue.getRegion().containsKey(new Long(1)));
-      // available ids should not contain the old id (the old position)
-      assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(1)));
-      // available id should have the new id (the new position)
-      assertTrue(regionqueue.getAvalaibleIds().contains(new Long(2)));
-      // events map should not contain the old position
-      assertTrue(regionqueue.getCurrentCounterSet(ev1).isEmpty());
-      // events map should contain the new position
-      assertTrue(regionqueue.getCurrentCounterSet(ev2).contains(new Long(2)));
-
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
-    }
+  public void testConflation() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    EventID ev2 = new EventID(new byte[] {1}, 2, 2);
+    Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+    regionQueue.put(cf1);
+
+    Map conflationMap = regionQueue.getConflationMapForTesting();
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(1L));
+
+    regionQueue.put(cf2);
+
+    // verify the conflation map has recorded the new key
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(2L));
+    // the old key should not be present
+    assertThat(!regionQueue.getRegion().containsKey(1L), is(true));
+    // available ids should not contain the old id (the old position)
+    assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true));
+    // available id should have the new id (the new position)
+    assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true));
+    // events map should not contain the old position
+    assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true));
+    // events map should contain the new position
+    assertThat(regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
   }
 
   /**
@@ -755,96 +514,52 @@ public class HARegionQueueJUnitTest {
    * events which are of ID greater than that contained in QRM should stay
    */
   @Test
-  public void testQRM() {
-    try {
-      RegionQueue regionqueue = createHARegionQueue("testing");
-      for (int i = 0; i < 10; ++i) {
-        regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
-            new EventID(new byte[] {1}, 1, i + 1), true, "testing"));
-      }
-      EventID qrmID = new EventID(new byte[] {1}, 1, 5);
-      ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
-      Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
-      assertTrue(((Map) (conflationMap.get("testing"))).size() == 5);
-
-      Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
-      Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
-      assertTrue(availableIDs.size() == 5);
-      assertTrue(counters.size() == 5);
-      for (int i = 5; i < 10; ++i) {
-        assertTrue(((Map) (conflationMap.get("testing"))).containsKey("key" + (i + 1)));
-        assertTrue(availableIDs.contains(new Long((i + 1))));
-        assertTrue(counters.contains(new Long((i + 1))));
-      }
-      Region rgn = ((HARegionQueue) regionqueue).getRegion();
-      assertTrue(rgn.keySet().size() == 6);
+  public void testQRM() throws Exception {
+    RegionQueue regionqueue = createHARegionQueue(this.testName.getMethodName());
 
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    for (int i = 0; i < 10; ++i) {
+      regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
+          new EventID(new byte[] {1}, 1, i + 1), true, this.testName.getMethodName()));
     }
-  }
 
-  protected static HARegionQueue hrqFortestSafeConflationRemoval;
+    EventID qrmID = new EventID(new byte[] {1}, 1, 5);
+    ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
+    Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).size(), is(5));
 
-  /**
-   * This test tests safe removal from the conflation map. i.e operations should only remove old
-   * values and not the latest value
-   */
-  @Test
-  public void testSafeConflationRemoval() {
-    try {
-      hrqFortestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval",
-
-          cache, this);
-      Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
-          true, "testSafeConflationRemoval");
-      hrqFortestSafeConflationRemoval.put(cf1);
-      hrqFortestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
-      Map map =
-
-          (Map) hrqFortestSafeConflationRemoval.getConflationMapForTesting()
-              .get("testSafeConflationRemoval");
-      assertTrue(
-          "Expected the counter to be 2 since it should not have been deleted but it is not so ",
-          map.get("key1").equals(new Long(2)));
-      hrqFortestSafeConflationRemoval = null;
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to ", e);
-    }
-  }
+    Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
+    Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
 
-  /**
-   * Extends HARegionQueue for testing purposes. used by testSafeConflationRemoval
-   */
-  static class HARQTestClass extends HARegionQueue.TestOnlyHARegionQueue {
+    assertThat(availableIDs.size(), is(5));
+    assertThat(counters.size(), is(5));
 
-    public HARQTestClass(String REGION_NAME, InternalCache cache, HARegionQueueJUnitTest test)
-        throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-      super(REGION_NAME, cache);
+    for (int i = 5; i < 10; ++i) {
+      assertThat(((Map) (conflationMap.get(this.testName.getMethodName()))).containsKey("key" + (i + 1)), is(true));
+      assertThat(availableIDs.contains((long) (i+1)), is(true));
+      assertThat(counters.contains((long) (i+1)), is(true));
     }
 
-    ConcurrentMap createConcurrentMap() {
-      return new ConcHashMap();
-    }
+    Region rgn = ((HARegionQueue) regionqueue).getRegion();
+    assertThat(rgn.keySet().size(), is(6));
   }
 
   /**
-   * Used to override the remove method for testSafeConflationRemoval
+   * This test tests safe removal from the conflation map. i.e operations should only remove old
+   * values and not the latest value
    */
-  static class ConcHashMap extends ConcurrentHashMap implements ConcurrentMap {
-    public boolean remove(Object arg0, Object arg1) {
-      Conflatable cf2 = new ConflatableObject("key1", "value2", new EventID(new byte[] {1}, 1, 2),
-          true, "testSafeConflationRemoval");
-      try {
-        hrqFortestSafeConflationRemoval.put(cf2);
-      } catch (Exception e) {
-        throw new AssertionError("Exception occurred in trying to put ", e);
-      }
-      return super.remove(arg0, arg1);
-    }
-  }
+  @Test
+  public void testSafeConflationRemoval() throws Exception {
+    hrqForTestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", this.cache);
+    Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
+        true, "testSafeConflationRemoval");
 
-  static List list1;
+    hrqForTestSafeConflationRemoval.put(cf1);
+    hrqForTestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
+
+    Map map = (Map) hrqForTestSafeConflationRemoval.getConflationMapForTesting().get("testSafeConflationRemoval");
+
+    assertThat("Expected the counter to be 2 since it should not have been deleted but it is not so ", map.get("key1"), is(2L));
+  }
 
   /**
    * This test tests remove operation is causing the insertion of sequence ID for existing
@@ -864,80 +579,83 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that all the sequence should be greater than x
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq.addDispatchedMessage(ids[i], i);
-      }
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = false;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+  public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() throws Exception {
+    long numberOfIterations = 1000;
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+    HARegionQueue.stopQRMThread();
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq.addDispatchedMessage(ids[i], i);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
         }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
-      iterator = list2.iterator();
-      doOnce = false;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
         }
       }
-      iterator = map.values().iterator();
-      Long max = new Long(numberOfIterations);
-      Long next;
-      while (iterator.hasNext()) {
-        next = ((Long) iterator.next());
-        assertTrue(" Expected all the sequence ID's to be greater than " + max
-            + " but it is not so. Got sequence id " + next, next.compareTo(max) >= 0);
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = false;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(new Long(id.getThreadID()), id.getSequenceID());
+      }
+    }
+
+    iterator = list2.iterator();
+    doOnce = false;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
       }
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    }
+
+    iterator = map.values().iterator();
+    Long max = numberOfIterations;
+    while (iterator.hasNext()) {
+      Long next = (Long) iterator.next();
+      assertThat(" Expected all the sequence ID's to be greater than " + max + " but it is not so. Got sequence id " + next, next.compareTo(max), greaterThanOrEqualTo(0));
     }
   }
 
@@ -958,77 +676,79 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that the map size should be 2x
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq.addDispatchedMessage(ids[i], i);
-      }
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
-            hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = false;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+  public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() throws Exception {
+    int numberOfIterations = 1000;
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+    HARegionQueue.stopQRMThread();
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq.addDispatchedMessage(ids[i], i);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
         }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
-      iterator = list2.iterator();
-      doOnce = false;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+          hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
         }
       }
-      assertTrue(
-          " Expected the map size to be " + (2 * numberOfIterations) + " but it is " + map.size(),
-          map.size() == (2 * numberOfIterations));
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to an unexpected exception : ", e);
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = false;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
+      }
     }
+
+    iterator = list2.iterator();
+    doOnce = false;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
+      }
+    }
+    assertThat(" Expected the map size to be " + 2 * numberOfIterations + " but it is " + map.size(), map.size(), is(2 * numberOfIterations));
   }
 
   /**
@@ -1050,101 +770,95 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that a total of x entries are present in the map
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() {
-    try {
-      final long numberOfIterations = 10000;
-      final HARegionQueue hrq1 = createHARegionQueue("testConcurrentDispatcherAndRemoval1");
+  public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() throws Exception {
+    int numberOfIterations = 10000;
+    HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+    HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+    HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+    HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+    HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
 
-      final HARegionQueue hrq2 = createHARegionQueue("testConcurrentDispatcherAndRemoval2");
+    HARegionQueue.stopQRMThread();
 
-      final HARegionQueue hrq3 = createHARegionQueue("testConcurrentDispatcherAndRemoval3");
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
 
-      final HARegionQueue hrq4 = createHARegionQueue("testConcurrentDispatcherAndRemoval4");
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq1.addDispatchedMessage(ids[i], i);
+      hrq2.addDispatchedMessage(ids[i], i);
 
-      final HARegionQueue hrq5 = createHARegionQueue("testConcurrentDispatcherAndRemoval5");
-
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq1.addDispatchedMessage(ids[i], i);
-        hrq2.addDispatchedMessage(ids[i], i);
+    }
 
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
+    };
 
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            hrq3.addDispatchedMessage(ids[i], i);
-            hrq4.addDispatchedMessage(ids[i], i);
-            hrq5.addDispatchedMessage(ids[i], i);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = true;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          hrq3.addDispatchedMessage(ids[i], i);
+          hrq4.addDispatchedMessage(ids[i], i);
+          hrq5.addDispatchedMessage(ids[i], i);
         }
       }
-
-      iterator = list2.iterator();
-      doOnce = true;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = true;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next();// region name;
+        int size = (Integer) iterator.next();
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
         }
       }
-      assertTrue(
-          " Expected the map size to be " + (numberOfIterations) + " but it is " + map.size(),
-          map.size() == (numberOfIterations));
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    iterator = list2.iterator();
+    doOnce = true;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next();// region name;
+        int size = (Integer) iterator.next();
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+        }
+      }
     }
+
+    assertThat(" Expected the map size to be " + numberOfIterations + " but it is " + map.size(), map.size(), is(numberOfIterations));
   }
 
   /**
@@ -1168,203 +882,172 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that the map size should be 2x * number of regions
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq1 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval1");
-
-      final HARegionQueue hrq2 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval2");
-      final HARegionQueue hrq3 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval3");
-      final HARegionQueue hrq4 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval4");
-      final HARegionQueue hrq5 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval5");
-
-      HARegionQueue.stopQRMThread();
-
-      final ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+  public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() throws Exception {
+    int numberOfIterations = 1000;
+    HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+    HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+    HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+    HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+    HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
+
+    HARegionQueue.stopQRMThread();
+
+    ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
+      ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
+      ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
+      ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
+      ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
+      hrq1.addDispatchedMessage(ids1[i], i);
+      hrq2.addDispatchedMessage(ids2[i], i);
+      hrq3.addDispatchedMessage(ids3[i], i);
+      hrq4.addDispatchedMessage(ids4[i], i);
+      hrq5.addDispatchedMessage(ids5[i], i);
+    }
 
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
-        ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
-        ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
-        ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
-        ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
-        hrq1.addDispatchedMessage(ids1[i], i);
-        hrq2.addDispatchedMessage(ids2[i], i);
-        hrq3.addDispatchedMessage(ids3[i], i);
-        hrq4.addDispatchedMessage(ids4[i], i);
-        hrq5.addDispatchedMessage(ids5[i], i);
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
+    };
 
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("Interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
-            ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
-            ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
-            ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
-            ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
-
-            hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
-            hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
-            hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
-            hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
-            hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = true;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          System.out.println(" size of list 1 iteration x " + size);
-          for (int i = 0; i < size; i++) {
-
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+          ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
+          ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
+          ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
+          ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
+
+          hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
+          hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
+          hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
+          hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
+          hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
         }
       }
-
-      iterator = list2.iterator();
-      doOnce = true;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          System.out.println(" size of list 2 iteration x " + size);
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = true;
+    EventID id = null;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next(); // region name;
+        int size = (Integer) iterator.next();
+        System.out.println(" size of list 1 iteration x " + size);
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
         }
       }
+    }
 
-      assertTrue(" Expected the map size to be " + (numberOfIterations * 2 * 5) + " but it is "
-          + map.size(), map.size() == (numberOfIterations * 2 * 5));
+    iterator = list2.iterator();
+    doOnce = true;
 
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next(); // region name;
+        int size = (Integer) iterator.next();
+        System.out.println(" size of list 2 iteration x " + size);
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+        }
+      }
     }
+
+    assertThat(" Expected the map size to be " + numberOfIterations * 2 * 5 + " but it is " + map.size(), map.size(), is(numberOfIterations * 2 * 5));
   }
 
   /**
-   * Concurrent Peek on Blokcing Queue waiting with for a Put . If concurrent take is also happening
+   * Concurrent Peek on Blocking Queue waiting with for a Put . If concurrent take is also happening
    * such that the object is removed first then the peek should block & not return with null.
    */
   @Test
-  public void testBlockingQueueForConcurrentPeekAndTake() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final TestBlockingHARegionQueue bQ =
-          new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", cache);
-      Thread[] threads = new Thread[3];
-      for (int i = 0; i < 3; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockingQueueForConcurrentPeekAndTake() throws Exception {
+    TestBlockingHARegionQueue regionQueue = new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", this.cache);
+    Thread[] threads = new Thread[3];
 
-              if (totalTime < 4000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 4000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+    for (int i = 0; i < 3; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError("Failed :  failed since object was null and was not expected to be null"));
             }
-          }
-        };
-
-      }
-
-      for (int k = 0; k < 3; k++) {
-        threads[k].start();
-      }
-      Thread.sleep(4000);
+            long totalTime = System.currentTimeMillis() - startTime;
 
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      EventID id1 = new EventID(new byte[] {1}, 1, 2);
+            if (totalTime < 4000) {
+              errorCollector.addError(new AssertionError("Failed :  Expected time to be greater than 4000 but it is not so"));
+            }
+          } catch (Exception e) {
+            errorCollector.addError(e);
+          }
+        }
+      };
+    }
 
-      bQ.takeFirst = true;
-      bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+    for (int k = 0; k < 3; k++) {
+      threads[k].start();
+    }
 
-      Thread.sleep(2000);
+    Thread.sleep(4000);
 
-      bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    EventID id1 = new EventID(new byte[] {1}, 1, 2);
 
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < 3; k++) {
-        ThreadUtils.join(threads[k], 180 * 1000);
-      }
+    regionQueue.takeFirst = true;
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
 
-      long totalTime = System.currentTimeMillis() - startTime;
+    Thread.sleep(2000);
 
-      if (totalTime >= 180000) {
-        fail(" Test taken too long ");
-      }
+    regionQueue.put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
 
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
+    long startTime = System.currentTimeMillis();
+    for (int k = 0; k < 3; k++) {
+      ThreadUtils.join(threads[k], 180 * 1000);
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+    long totalTime = System.currentTimeMillis() - startTime;
+    if (totalTime >= 180000) {
+      fail(" Test taken too long ");
     }
   }
 
@@ -1373,71 +1056,56 @@ public class HARegionQueueJUnitTest {
    * QRM thread , the peek should block correctly.
    */
   @Test
-  public void testBlockingQueueForTakeWhenPeekInProgress() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final TestBlockingHARegionQueue bQ =
-          new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", cache);
-      Thread[] threads = new Thread[3];
-      for (int i = 0; i < 3; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockingQueueForTakeWhenPeekInProgress() throws Exception {
+    TestBlockingHARegionQueue regionQueue = new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", this.cache);
+    Thread[] threads = new Thread[3];
 
-              if (totalTime < 4000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 4000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+    for (int i = 0; i < 3; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError("Failed :  failed since object was null and was not expected to be null"));
             }
-          }
-        };
-      }
+            long totalTime = System.currentTimeMillis() - startTime;
 
-      for (int k = 0; k < 3; k++) {
-        threads[k].start();
-      }
-      Thread.sleep(4000);
-
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      EventID id1 = new EventID(new byte[] {1}, 1, 2);
+            if (totalTime < 4000) {
+              errorCollector.addError(new AssertionError("Failed :  Expected time to be greater than 4000 but it is not so"));
+            }
+          } catch (Exception e) {
+            errorCollector.addError(e);
+          }
+        }
+      };
+    }
 
-      bQ.takeWhenPeekInProgress = true;
-      bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+    for (int k = 0; k < 3; k++) {
+      threads[k].start();
+    }
 
-      Thread.sleep(2000);
+    Thread.sleep(4000);
 
-      bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    EventID id1 = new EventID(new byte[] {1}, 1, 2);
 
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < 3; k++) {
-        ThreadUtils.join(threads[k], 60 * 1000);
-      }
+    regionQueue.takeWhenPeekInProgress = true;
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
 
-      long totalTime = System.currentTimeMillis() - startTime;
+    Thread.sleep(2000);
 
-      if (totalTime >= 60000) {
-        fail(" Test taken too long ");
-      }
+    regionQueue.put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
 
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
+    long startTime = System.currentTimeMillis();
+    for (int k = 0; k < 3; k++) {
+      ThreadUtils.join(threads[k], 60 * 1000);
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+    long totalTime = System.currentTimeMillis() - startTime;
+    if (totalTime >= 60000) {
+      fail(" Test taken too long ");
     }
   }
 
@@ -1451,138 +1119,86 @@ public class HARegionQueueJUnitTest {
    * violation. This test will validate that behaviour
    */
   @Test
-  public void testConcurrentEventExpiryAndTake() {
-    try {
-      HARegionQueueAttributes haa = new HARegionQueueAttributes();
-      haa.setExpiryTime(3);
-      final RegionQueue regionqueue =
-          new HARegionQueue.TestOnlyHARegionQueue("testing", cache, haa) {
-            CacheListener createCacheListenerForHARegion() {
-
-              return new CacheListenerAdapter() {
-
-                public void afterInvalidate(EntryEvent event) {
-
-                  if (event.getKey() instanceof Long) {
-                    synchronized (HARegionQueueJUnitTest.this) {
-                      expiryCalled = true;
-                      HARegionQueueJUnitTest.this.notify();
-
-                    } ;
-                    Thread.yield();
-
-                    synchronized (HARegionQueueJUnitTest.this) {
-                      if (!allowExpiryToProceed) {
-                        try {
-                          HARegionQueueJUnitTest.this.wait();
-                        } catch (InterruptedException e1) {
-                          encounteredException = true;
-                        }
-                      }
-                    }
-                    try {
-                      expireTheEventOrThreadIdentifier(event);
-                    } catch (CacheException e) {
-                      e.printStackTrace();
-                      encounteredException = true;
-                    } finally {
-                      synchronized (HARegionQueueJUnitTest.this) {
-                        complete = true;
-                        HARegionQueueJUnitTest.this.notify();
-                      }
-                    }
+  public void testConcurrentEventExpiryAndTake() throws Exception {
+    AtomicBoolean complete = new AtomicBoolean(false);
+    AtomicBoolean expiryCalled = new AtomicBoolean(false);
+    AtomicBoolean allowExpiryToProceed = new AtomicBoolean(false);
+
+    HARegionQueueAttributes haa = new HARegionQueueAttributes();
+    haa.setExpiryTime(3);
+
+    RegionQueue regionqueue = new HARegionQueue.TestOnlyHARegionQueue(this.testName.getMethodName(), this.cache, haa) {
+      @Override
+      CacheListener createCacheListenerForHARegion() {
+
+        return new CacheListenerAdapter() {
+
+          @Override
+          public void afterInvalidate(EntryEvent event) {
+
+            if (event.getKey() instanceof Long) {
+              synchronized (HARegionQueueJUnitTest.this) {
+                expiryCalled.set(true);
+                HARegionQueueJUnitTest.this.notifyAll();
+              }
+
+              Thread.yield();
+
+              synchronized (HARegionQueueJUnitTest.this) {
+                while (!allowExpiryToProceed.get()) {
+                  try {
+                    HARegionQueueJUnitTest.this.wait();
+                  } catch (InterruptedException e) {
+                    errorCollector.addError(e);
+                    break;
                   }
                 }
-              };
+              }
+
+              try {
+                expireTheEventOrThreadIdentifier(event);
+              } catch (CacheException e) {
+                errorCollector.addError(e);
+              } finally {
+                synchronized (HARegionQueueJUnitTest.this) {
+                  complete.set(true);
+                  HARegionQueueJUnitTest.this.notifyAll();
+                }
+              }
             }
-          };
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+          }
+        };
+      }
+    };
 
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    regionqueue.put(cf1);
 
-      regionqueue.put(cf1);
-      synchronized (this) {
-        if (!expiryCalled) {
-          this.wait();
-        }
+    synchronized (this) {
+      while (!expiryCalled.get()) {
+        wait();
       }
-      try {
-        Object o = regionqueue.take();
-        assertNull(o);
-      } catch (Exception e) {
-        throw new AssertionError("Test failed due to exception ", e);
-      } finally {
-        synchronized (this) {
-          this.allowExpiryToProceed = true;
-          this.notify();
-        }
-      }
-      synchronized (this) {
-        if (!this.complete) {
-          this.wait();
-        }
-      }
-      assertTrue("Test failed due to exception ", !encounteredException);
-    } catch (Exception e) {
-      throw new AssertionError("test failed due to ", e);
     }
-  }
 
-  /**
-   * This test validates that if sequence violation occurs without 

<TRUNCATED>