You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/01 19:49:19 UTC

[06/36] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 7ee5a8d..9c2ea23 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -14,8 +14,52 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.MirrorType;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.query.internal.CqQueryVsdStats;
 import org.apache.geode.cache.query.internal.cq.CqService;
@@ -30,9 +74,20 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.internal.cache.tier.sockets.*;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.cache.tier.sockets.HandShake;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -41,17 +96,6 @@ import org.apache.geode.internal.util.concurrent.StoppableCondition;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.*;
 
 /**
  * An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be
@@ -72,26 +116,23 @@ import java.util.concurrent.locks.*;
  * 
  * 30 May 2008: 5.7 onwards the underlying GemFire Region will continue to have key as counter(long)
  * but the value will be a wrapper object(HAEventWrapper) which will be a key in a separate data
- * strucure called haContainer (an implementation of Map). The value against this wrapper will be
+ * structure called haContainer (an implementation of Map). The value against this wrapper will be
  * the offered object in the queue. The purpose of this modification is to allow multiple
  * ha-region-queues share their offered values without storing separate copies in memory, upon GII.
  * 
  * (See BlockingHARegionQueue)
  * 
- * 
  * @since GemFire 4.3
- * 
  */
 public class HARegionQueue implements RegionQueue {
   private static final Logger logger = LogService.getLogger();
 
-  /** The <code>Region</code> backing this queue */
+  /** The {@code Region} backing this queue */
   protected HARegion region;
 
   /**
-   * The key into the <code>Region</code> used when putting entries onto the queue. The counter uses
+   * The key into the {@code Region} used when putting entries onto the queue. The counter uses
    * incrementAndGet so counter will always be started from 1
-   * 
    */
   protected final AtomicLong tailKey = new AtomicLong(0);
 
@@ -100,26 +141,21 @@ public class HARegionQueue implements RegionQueue {
    * object. Every add operation will be identified by the ThreadIdentifier object & the position
    * recorded in the LastDispatchedAndCurrentEvents object.
    */
-
   protected final ConcurrentMap eventsMap = new ConcurrentHashMap();
 
   /**
-   * The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
-   * updating of entries in the queue for conflation.
+   * The {@code Map} mapping the regionName->key to the queue key. This index allows fast updating
+   * of entries in the queue for conflation.
    */
   protected volatile Map indexes = Collections.unmodifiableMap(new HashMap());
 
-  // TODO:Asif: Should we worry about whether to some how make it writer
-  // preference?
-  /** Lock object for updating the queue size by different operations */
-  // private final Object SIZE_LOCK = new Object();
   private final StoppableReentrantReadWriteLock rwLock;
 
   private final StoppableReentrantReadWriteLock.StoppableReadLock readLock;
 
   private final StoppableWriteLock writeLock;
 
-  /** The name of the <code>Region</code> backing this queue */
+  /** The name of the {@code Region} backing this queue */
   private final String regionName;
 
   /** The ClientProxyMembershipID associated with the ha queue */
@@ -151,10 +187,7 @@ public class HARegionQueue implements RegionQueue {
    * A sequence violation can occur , if an HARegionQueue receives events thru GII & the same event
    * also arrives via Gemfire Put in that local VM. If the HARegionQueue does not receive any data
    * via GII , then there should not be any violation. If there is data arriving thru GII, such
-   * voiolations can be expected , but should be analyzed thoroughly.
-   * 
-   * <p>
-   * author Asif
+   * violations can be expected , but should be analyzed thoroughly.
    */
   protected boolean puttingGIIDataInQueue;
 
@@ -166,14 +199,12 @@ public class HARegionQueue implements RegionQueue {
 
   /**
    * a thread local to store the counters corresponding to the events peeked by a particular thread.
-   * When <code>remove()</code> will be called, these events stored in thread-local will be
-   * destroyed.
+   * When {@code remove()} will be called, these events stored in thread-local will be destroyed.
    */
   protected static final ThreadLocal peekedEventsContext = new ThreadLocal();
 
   /**
-   * Thread which creates the <code>QueueRemovalMessage</code> and sends it to other nodes in the
-   * system
+   * Thread which creates the {@code QueueRemovalMessage} and sends it to other nodes in the system
    */
   private static QueueRemovalThread qrmThread;
 
@@ -284,30 +315,18 @@ public class HARegionQueue implements RegionQueue {
   protected long maxQueueSizeHitCount = 0;
 
   /**
-   * 
    * Processes the given string and returns a string which is allowed for region names
    * 
-   * @param regionName
    * @return legal region name
    */
   public static String createRegionName(String regionName) {
-    String result = regionName.replace('/', '#'); // [yogi]: region name cannot
-    // contain the separator '/'
-    return result;
+    return regionName.replace('/', '#');
   }
 
   /**
-   * 
-   * @param regionName
-   * @param cache
    * @param isPrimary whether this is the primary queue for a client
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
    */
-
-  protected HARegionQueue(String regionName, GemFireCacheImpl cache,
+  protected HARegionQueue(String regionName, InternalCache cache,
       HARegionQueueAttributes haAttributes, Map haContainer, ClientProxyMembershipID clientProxyId,
       final byte clientConflation, boolean isPrimary)
       throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -332,13 +351,13 @@ public class HARegionQueue implements RegionQueue {
     this.readLock = this.rwLock.readLock();
     this.writeLock = this.rwLock.writeLock();
 
-    this.putGIIDataInRegion();
+    putGIIDataInRegion();
     if (this.getClass() == HARegionQueue.class) {
       initialized.set(true);
     }
   }
 
-  private void createHARegion(String processedRegionName, GemFireCacheImpl cache)
+  private void createHARegion(String processedRegionName, InternalCache cache)
       throws IOException, ClassNotFoundException {
     AttributesFactory af = new AttributesFactory();
     af.setMirrorType(MirrorType.KEYS_VALUES);
@@ -358,7 +377,7 @@ public class HARegionQueue implements RegionQueue {
    * reinitialize the queue, presumably pulling current information from seconaries
    */
   public void reinitializeRegion() {
-    GemFireCacheImpl cache = this.region.getCache();
+    InternalCache cache = this.region.getCache();
     String regionName = this.region.getName();
     this.region.destroyRegion();
     Exception problem = null;
@@ -412,7 +431,7 @@ public class HARegionQueue implements RegionQueue {
         // use putIfAbsent to avoid overwriting newer dispatch information
         Object o = this.eventsMap.putIfAbsent(entry.getKey(), giiDace);
         if (o != null && isDebugEnabled_BS) {
-          sb.append(" -- could not store.  found " + o);
+          sb.append(" -- could not store.  found ").append(o);
         }
       }
     }
@@ -425,11 +444,8 @@ public class HARegionQueue implements RegionQueue {
    * Repopulates the HARegion after the GII is over so as to reset the counters and populate the
    * DACE objects for the thread identifiers . This method should be invoked as the last method in
    * the constructor . Thus while creating BlockingQueue this method should be invoked lastly in the
-   * derived class contructor , after the HARegionQueue contructor is complete. Otherwise, the
+   * derived class constructor , after the HARegionQueue contructor is complete. Otherwise, the
    * ReentrantLock will be null.
-   * 
-   * @throws CacheException
-   * @throws InterruptedException
    */
   void putGIIDataInRegion() throws CacheException, InterruptedException {
     Set entrySet = this.region.entries(false);
@@ -498,8 +514,6 @@ public class HARegionQueue implements RegionQueue {
    * Puts the GII'd entry into the ha region, if it was GII'd along with its ClientUpdateMessageImpl
    * instance.
    * 
-   * @param val
-   * @throws InterruptedException
    * @since GemFire 5.7
    */
   protected void putInQueue(Object val) throws InterruptedException {
@@ -507,7 +521,7 @@ public class HARegionQueue implements RegionQueue {
       if (logger.isDebugEnabled()) {
         logger.debug(
             "HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.",
-            ((HAEventWrapper) val).getKeyToConflate());
+            ((Conflatable) val).getKeyToConflate());
       }
     } else {
       this.put(val);
@@ -567,9 +581,6 @@ public class HARegionQueue implements RegionQueue {
    * object & SIZE Lock
    * 
    * @param object object to put onto the queue
-   * @throws InterruptedException
-   * @throws CacheException
-   * @return boolean
    */
   public boolean put(Object object) throws CacheException, InterruptedException {
     this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
@@ -663,11 +674,9 @@ public class HARegionQueue implements RegionQueue {
         dace = oldDace;
       } else {
         // Add the recently added ThreadIdentifier to the RegionQueue for expiry
-        this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+        this.region.put(ti, dace.lastDispatchedSequenceId);
         // update the stats
-        // if (logger.isDebugEnabled()) {
         this.stats.incThreadIdentifiers();
-        // }
       }
       if (!dace.putObject(event, sequenceID)) {
         this.put(object);
@@ -677,11 +686,6 @@ public class HARegionQueue implements RegionQueue {
         }
       }
     }
-    // update the stats
-    // if (logger.isDebugEnabled()) {
-    // this.stats.incEventsEnqued();
-    // }
-
   }
 
   /**
@@ -691,7 +695,7 @@ public class HARegionQueue implements RegionQueue {
    */
   public void startGiiQueueing() {
     this.giiLock.writeLock().lock();
-    this.giiCount++;
+    this.giiCount++; // TODO: non-atomic operation on volatile!
     if (logger.isDebugEnabled()) {
       logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount);
     }
@@ -710,7 +714,7 @@ public class HARegionQueue implements RegionQueue {
     this.giiLock.writeLock().lock();
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
-      this.giiCount--;
+      this.giiCount--; // TODO: non-atomic operation on volatile!
       if (isDebugEnabled) {
         logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount);
       }
@@ -731,15 +735,7 @@ public class HARegionQueue implements RegionQueue {
           Object value;
           try {
             value = this.giiQueue.remove();
-          } catch (NoSuchElementException e) {
-            // if (actualCount != expectedCount) {
-            // logger.severe(LocalizedStrings.DEBUG, "expected to drain "
-            // + expectedCount + " messages but drained " + actualCount
-            // + " queue.size() is now " + giiQueue.size() + ", in queue" + this, e);
-            // } else {
-            // logger.severe(LocalizedStrings.DEBUG, "drained " + actualCount + " messages. Queue
-            // size is " + giiQueue.size() + " in " + this);
-            // }
+          } catch (NoSuchElementException ignore) {
             break;
           }
           actualCount++;
@@ -765,13 +761,11 @@ public class HARegionQueue implements RegionQueue {
             if (value instanceof HAEventWrapper) {
               decAndRemoveFromHAContainer((HAEventWrapper) value);
             }
-          } catch (NoSuchElementException e) {
+          } catch (NoSuchElementException ignore) {
             break;
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ignore) {
             // complete draining while holding the write-lock so nothing else
             // can get into the queue
-            // logger.severe(LocalizedStrings.DEBUG, "endGiiQueueing interrupted - ignoring until
-            // draining completes");
             interrupted = true;
           }
         }
@@ -804,19 +798,19 @@ public class HARegionQueue implements RegionQueue {
    */
   public Map getEventMapForGII() {
     // fix for bug #41621 - concurrent modification exception while serializing event map
-    Map<ThreadIdentifier, DispatchedAndCurrentEvents> events = this.eventsMap;
     final boolean isDebugEnabled = logger.isDebugEnabled();
     do {
       HashMap result = new HashMap();
       try {
-        for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : events.entrySet()) {
+        for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : ((Map<ThreadIdentifier, DispatchedAndCurrentEvents>) this.eventsMap)
+            .entrySet()) {
           if (entry.getValue().isCountersEmpty()) {
             result.put(entry.getKey(), entry.getValue());
           }
         }
         return result;
-      } catch (ConcurrentModificationException e) { // TODO:WTF: bad practice but eventsMap is
-                                                    // ConcurrentHashMap
+      } catch (ConcurrentModificationException ignore) {
+        // TODO:WTF: bad practice but eventsMap is ConcurrentHashMap
         if (isDebugEnabled) {
           logger.debug(
               "HARegion encountered concurrent modification exception while analysing event state - will try again");
@@ -826,9 +820,7 @@ public class HARegionQueue implements RegionQueue {
   }
 
   /**
-   * Implementation in BlokcingHARegionQueue class
-   * 
-   * @throws InterruptedException
+   * Implementation in BlockingHARegionQueue class
    */
   void checkQueueSizeConstraint() throws InterruptedException {
     if (Thread.interrupted())
@@ -846,13 +838,11 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Creates the static dispatchedMessagesMap (if not present) and starts the QueuRemovalThread if
    * not running
-   * 
    */
-  public static synchronized void startHAServices(GemFireCacheImpl c) {
-
+  static synchronized void startHAServices(InternalCache cache) {
     if (qrmThread == null) {
       dispatchedMessagesMap = new ConcurrentHashMap();
-      qrmThread = new QueueRemovalThread(c);
+      qrmThread = new QueueRemovalThread(cache);
       qrmThread.setName("Queue Removal Thread");
       qrmThread.start();
     }
@@ -913,20 +903,16 @@ public class HARegionQueue implements RegionQueue {
       }
     }
     Object key = event.getKeyToConflate();
-    Long previousPosition = (Long) latestIndexesForRegion.put(key, newPosition);
-    return previousPosition;
-
+    return (Long) latestIndexesForRegion.put(key, newPosition);
   }
 
   /**
-   * 
    * Creates and returns a ConcurrentMap. This method is over-ridden in test classes to test some
    * functionality
    * 
    * @return new ConcurrentMap
    */
   ConcurrentMap createConcurrentMap() {
-
     return new ConcurrentHashMap();
   }
 
@@ -948,7 +934,7 @@ public class HARegionQueue implements RegionQueue {
           // if (!HARegionQueue.this.isPrimary()) {
           HARegionQueue.this.expireTheEventOrThreadIdentifier(event);
           // }
-        } catch (CancelException e) {
+        } catch (CancelException ignore) {
           // ignore, we're done
         } catch (CacheException ce) {
           if (!destroyInProgress) {
@@ -967,11 +953,8 @@ public class HARegionQueue implements RegionQueue {
    * overridden function createCacheListenerForHARegion of the HARegionQueueJUnitTest class for the
    * test testConcurrentEventExpiryAndTake. This function provides the meaningful functionality for
    * expiry of the Event object as well as ThreadIdentifier
-   * <p>
-   * author Asif
-   * 
+   *
    * @param event event object representing the data being expired
-   * @throws CacheException
    */
   void expireTheEventOrThreadIdentifier(EntryEvent event) throws CacheException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -980,11 +963,6 @@ public class HARegionQueue implements RegionQueue {
           "HARegionQueue::afterInvalidate. Entry Event being invalidated:{}, isPrimaryQueue:{}",
           event, HARegionQueue.this.isPrimary());
     }
-    // if (HARegionQueue.this.isPrimary()) {
-    // logger.info(LocalizedStrings.DEBUG,
-    // "HARegionQueue: Entry Event being invalidated ="
-    // + event+", after current queue became primary.");
-    // }
     Object key = event.getKey();
     if (key instanceof ThreadIdentifier) {
       // Check if the sequenceID present as value against this key is same
@@ -998,7 +976,7 @@ public class HARegionQueue implements RegionQueue {
           (DispatchedAndCurrentEvents) HARegionQueue.this.eventsMap.get(key);
       Assert.assertTrue(dace != null);
       Long expirySequenceID = (Long) event.getOldValue();
-      boolean expired = dace.expireOrUpdate(expirySequenceID.longValue(), (ThreadIdentifier) key);
+      boolean expired = dace.expireOrUpdate(expirySequenceID, (ThreadIdentifier) key);
       if (isDebugEnabled) {
         logger.debug(
             "HARegionQueue::afterInvalidate:Size of the region after expiring or updating the ThreadIdentifier={}",
@@ -1028,21 +1006,18 @@ public class HARegionQueue implements RegionQueue {
 
   /**
    * This method adds the position of newly added object to the List of available IDs so that it is
-   * avaialble for peek or take. This method is called from DispatchedAndCurrentEvents object. This
+   * available for peek or take. This method is called from DispatchedAndCurrentEvents object. This
    * method is invoked in a write lock for non blocking queue & in a reentrant lock in a blocking
-   * queue. In case of blokcing queue , this method also signals the waiting take & peek threads to
+   * queue. In case of blocking queue , this method also signals the waiting take & peek threads to
    * awake.
-   * <p>
-   * author Asif
-   * 
+   *
    * @param position The Long position of the object which has been added
-   * @throws InterruptedException
    */
   void publish(Long position) throws InterruptedException {
     acquireWriteLock();
     try {
       this.idsAvailable.add(position);
-      // Asif:Notify the wiating peek threads or take threads of blocking queue
+      // Notify the waiting peek threads or take threads of blocking queue
       // A void operation for the non blocking queue operations
       notifyPeekAndTakeThreads();
     } finally {
@@ -1055,12 +1030,8 @@ public class HARegionQueue implements RegionQueue {
   }
 
   /**
-   * 
-   * <p>
-   * author Asif
-   * 
    * @param position Long value present in the Available IDs map against which Event object is
-   *        present in HARegion. This function is directly ivnoked from the basicInvalidate function
+   *        present in HARegion. This function is directly invoked from the basicInvalidate function
    *        where expiry is aborted if this function returns false
    * @return boolean true if the position could be removed from the Set
    * @throws InterruptedException *
@@ -1087,12 +1058,10 @@ public class HARegionQueue implements RegionQueue {
    * in the AvailableID Set. If the position existed in the Set, then only it is removed from the
    * Set & the underlying Region
    * 
-   * @param position Long poistion counter for entry in the Region
+   * @param position Long position counter for entry in the Region
    * @return true if the entry with <br>
    *         position <br>
    *         specified was removed from the Set
-   * @throws InterruptedException
-   * 
    */
   protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException {
     boolean removedOK = this.destroyFromAvailableIDs(position);
@@ -1100,9 +1069,7 @@ public class HARegionQueue implements RegionQueue {
     if (removedOK) {
       try {
         this.destroyFromQueue(position);
-      } catch (EntryNotFoundException enfe) {
-        // if (!this.region.isDestroyed()) {
-        // if (!HARegionQueue.this.destroyInProgress || !this.region.isDestroyed()) {
+      } catch (EntryNotFoundException ignore) {
         if (!HARegionQueue.this.destroyInProgress) {
           if (!this.region.isDestroyed()) {
             Assert.assertTrue(false, "HARegionQueue::remove: The position " + position
@@ -1128,7 +1095,7 @@ public class HARegionQueue implements RegionQueue {
     maintainCqStats(event, -1);
   }
 
-  /** Returns the <code>toString</code> for this RegionQueue object */
+  /** Returns the {@code toString} for this RegionQueue object */
   @Override
   public String toString() {
     return "RegionQueue on " + this.regionName + "(" + (this.isPrimary ? "primary" : "backup")
@@ -1144,12 +1111,9 @@ public class HARegionQueue implements RegionQueue {
 
 
   /**
-   * This method is inoked by the take function . For non blocking queue it returns null or a valid
+   * This method is invoked by the take function . For non blocking queue it returns null or a valid
    * long position while for blocking queue it waits for data in the queue or throws Exception if
    * the thread encounters exception while waiting.
-   * 
-   * @throws CacheException
-   * @throws InterruptedException
    */
   protected Long getAndRemoveNextAvailableID() throws InterruptedException {
     Long next = null;
@@ -1177,14 +1141,9 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Returns the next position counter present in idsAvailable set. This method is invoked by the
    * peek function. In case of BlockingQueue, this method waits till a valid ID is available.
-   * 
-   * <p>
-   * author Asif
-   * 
+   *
    * @return valid Long poistion or null depending upon the nature of the queue
-   * @throws InterruptedException
    * @throws TimeoutException if operation is interrupted (unfortunately)
-   * 
    */
   private Long getNextAvailableID() throws InterruptedException {
     Long next = null;
@@ -1208,7 +1167,6 @@ public class HARegionQueue implements RegionQueue {
    * For non blocking queue , this method either returns null or an Object. For blocking queue it
    * will always return with an Object or wait for queue to be populated.
    * 
-   * @throws InterruptedException
    * @throws CacheException The exception can be thrown by BlockingQueue if it encounters
    *         InterruptedException while waiting for data
    */
@@ -1281,8 +1239,6 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Removes the events that were peeked by this thread. The events are destroyed from the queue and
    * conflation map and DispatchedAndCurrentEvents are updated accordingly.
-   * 
-   * @throws InterruptedException
    */
   public void remove() throws InterruptedException {
     List peekedIds = (List) HARegionQueue.peekedEventsContext.get();
@@ -1324,10 +1280,10 @@ public class HARegionQueue implements RegionQueue {
         List countersList;
         if ((countersList = (List) groupedThreadIDs.get(threadid)) != null) {
           countersList.add(info);
-          countersList.set(0, Long.valueOf(sequenceId));
+          countersList.set(0, sequenceId);
         } else {
           countersList = new ArrayList();
-          countersList.add(Long.valueOf(sequenceId));
+          countersList.add(sequenceId);
           countersList.add(info);
           groupedThreadIDs.put(threadid, countersList);
         }
@@ -1344,7 +1300,7 @@ public class HARegionQueue implements RegionQueue {
       Map.Entry element = (Map.Entry) iter.next();
       ThreadIdentifier tid = (ThreadIdentifier) element.getKey();
       List removedEvents = (List) element.getValue();
-      long lastDispatchedId = ((Long) removedEvents.remove(0)).longValue();
+      long lastDispatchedId = (Long) removedEvents.remove(0);
       DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(tid);
       if (dace != null && dace.lastDispatchedSequenceId < lastDispatchedId) {
         try {
@@ -1387,7 +1343,7 @@ public class HARegionQueue implements RegionQueue {
         if (next == null) {
           break;
         }
-      } catch (TimeoutException te) {
+      } catch (TimeoutException ignore) {
         throw new InterruptedException();
       }
       object = (Conflatable) this.region.get(next);
@@ -1459,8 +1415,6 @@ public class HARegionQueue implements RegionQueue {
    * @param timeToWait The number of milliseconds to attempt to peek
    * 
    * @return The list of events peeked
-   * @throws InterruptedException
-   *
    */
   public List peek(int batchSize, int timeToWait) throws InterruptedException {
     long start = System.currentTimeMillis();
@@ -1506,7 +1460,7 @@ public class HARegionQueue implements RegionQueue {
       boolean interrupted = Thread.interrupted();
       try {
         Thread.sleep(50); // TODO this seems kinda busy IMNSHO -- jason
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         interrupted = true;
         this.region.getCancelCriterion().checkCancelInProgress(null);
       } finally {
@@ -1520,11 +1474,10 @@ public class HARegionQueue implements RegionQueue {
   /**
    * This method prepares the batch of events and updates the thread-context with corresponding
    * counters, so that when remove is called by this thread, these events are destroyed from the
-   * queue.This method should always be invoked within the <code>rwLock</code>.
+   * queue.This method should always be invoked within the {@code rwLock}.
    * 
    * @param batchSize - number of events to be peeked
    * @return - list of events peeked
-   * @throws CacheException
    */
   private List getBatchAndUpdateThreadContext(int batchSize) {
     Iterator itr = this.idsAvailable.iterator();
@@ -1559,26 +1512,22 @@ public class HARegionQueue implements RegionQueue {
   }
 
   public void addCacheListener(CacheListener listener) {
-    // TODO Auto-generated method stub
-
+    // nothing
   }
 
   public void removeCacheListener() {
-    // TODO Auto-generated method stub
+    // nothing
   }
 
   /**
    * It adds the entry to a static data structure dispatchedMessagesMap which is periodically
    * operated upon by the QRM thread.
-   * 
-   * <p>
-   * author Asif
-   * 
+   *
    * @param tid - the ThreadIdentifier object for this event
    * @param sequenceId - the sequence id for this event
    */
   public void addDispatchedMessage(ThreadIdentifier tid, long sequenceId) {
-    Long lastSequenceNumber = Long.valueOf(sequenceId);
+    Long lastSequenceNumber = sequenceId;
     boolean wasEmpty = false;
     Long oldvalue = null;
     Map internalMap = null;
@@ -1734,16 +1683,10 @@ public class HARegionQueue implements RegionQueue {
    * creates a DACE. Only one QRM operates at a time on a DACE & any other mesasge will be waiting
    * for the current thread to exit. This is accomplished by taking a lock on QRM_LOCK object in the
    * DACE.
-   * 
-   * <p>
-   * author Asif
-   * 
+   *
    * @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence
    *        Id
-   * @throws CacheException
-   * @throws InterruptedException
    */
-
   void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException {
     ThreadIdentifier ti = getThreadIdentifier(lastDispatched);
     long sequenceID = lastDispatched.getSequenceID();
@@ -1764,11 +1707,9 @@ public class HARegionQueue implements RegionQueue {
       } else {
         // Add the recently added ThreadIdentifier to the RegionQueue for
         // expiry
-        this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+        this.region.put(ti, dace.lastDispatchedSequenceId);
         // update the stats
-        // if (logger.isDebugEnabled()) {
         this.stats.incThreadIdentifiers();
-        // }
       }
     }
   }
@@ -1778,7 +1719,6 @@ public class HARegionQueue implements RegionQueue {
    * 
    * @return the size of the queue
    */
-
   public int size() {
     acquireReadLock();
     try {
@@ -1788,12 +1728,8 @@ public class HARegionQueue implements RegionQueue {
     }
   }
 
-  void decrementTakeSidePutPermits() {
-
-  }
-
   void incrementTakeSidePutPermits() {
-
+    // nothing
   }
 
   // called from dace on a put.
@@ -1929,13 +1865,8 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Always returns false for a HARegionQueue class. Suitably overridden in BlockingHARegionQueue
    * class.
-   * 
-   * <p>
-   * author Asif
-   * 
+   *
    * @return false for HAREgionQueue as this is a non blocking class
-   * @throws InterruptedException
-   * 
    */
   boolean waitForData() throws InterruptedException {
     return false;
@@ -1976,12 +1907,8 @@ public class HARegionQueue implements RegionQueue {
    * @param cache Gemfire Cache instance
    * @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking
    * @return an instance of HARegionQueue
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws CacheException
    */
-  public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+  public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
       final int haRgnQType, final boolean isDurable)
       throws IOException, ClassNotFoundException, CacheException, InterruptedException {
     Map container = null;
@@ -1993,7 +1920,7 @@ public class HARegionQueue implements RegionQueue {
       container = new HashMap();
     }
 
-    return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache,
+    return getHARegionQueueInstance(regionName, cache,
         HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haRgnQType, isDurable, container, null,
         HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
   }
@@ -2009,12 +1936,8 @@ public class HARegionQueue implements RegionQueue {
    * @param isPrimary whether this is the primary queue for the client
    * @param canHandleDelta boolean indicating whether the HARegionQueue can handle delta or not
    * @return an instance of HARegionQueue
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
    */
-  public static HARegionQueue getHARegionQueueInstance(String regionName, GemFireCacheImpl cache,
+  public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
       HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable, Map haContainer,
       ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary,
       boolean canHandleDelta)
@@ -2038,12 +1961,11 @@ public class HARegionQueue implements RegionQueue {
       default:
         throw new IllegalArgumentException(
             LocalizedStrings.HARegionQueue_HARGNQTYPE_CAN_EITHER_BE_BLOCKING_0_OR_NON_BLOCKING_1
-                .toLocalizedString(new Object[] {Integer.valueOf(BLOCKING_HA_QUEUE),
-                    Integer.valueOf(NON_BLOCKING_HA_QUEUE)}));
+                .toLocalizedString(new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE}));
     }
     if (!isDurable) {
       Integer expiryTime = Integer.getInteger(REGION_ENTRY_EXPIRY_TIME, hrqa.getExpiryTime());
-      hrqa.setExpiryTime(expiryTime.intValue());
+      hrqa.setExpiryTime(expiryTime);
       ExpirationAttributes ea =
           new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE);
       hrq.region.getAttributesMutator().setEntryTimeToLive(ea);
@@ -2054,19 +1976,10 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Creates a HARegionQueue object with default attributes. used by tests
    * 
-   * @param regionName
-   * @param cache
-   * @param hrqa
-   * @param haRgnQType
-   * @param isDurable
    * @return an instance of HARegionQueue
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
    * @since GemFire 5.7
    */
-  public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+  public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
       HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable)
       throws IOException, ClassNotFoundException, CacheException, InterruptedException {
     Map container = null;
@@ -2078,8 +1991,8 @@ public class HARegionQueue implements RegionQueue {
       container = new HashMap();
     }
 
-    return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache, hrqa, haRgnQType,
-        isDurable, container, null, HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
+    return getHARegionQueueInstance(regionName, cache, hrqa, haRgnQType, isDurable, container, null,
+        HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
   }
 
   public boolean isEmptyAckList() {
@@ -2122,7 +2035,7 @@ public class HARegionQueue implements RegionQueue {
                 if (this.destroyFromAvailableIDsAndRegion(counter)) {
                   stats.incEventsRemoved();
                 }
-              } catch (InterruptedException e) {
+              } catch (InterruptedException ignore) {
                 Thread.currentThread().interrupt();
               }
             }
@@ -2134,18 +2047,14 @@ public class HARegionQueue implements RegionQueue {
     }
   }
 
-
-
   /**
-   * This is an implemention of RegionQueue where peek() & take () are blocking operation and will
+   * This is an implementation of RegionQueue where peek() & take () are blocking operation and will
    * not return unless it gets some legitimate value The Lock object used by this class is a
    * ReentrantLock & not a ReadWriteLock as in the base class. This reduces the concurrency of peek
    * operations, but it enables the condition object of the ReentrantLock used to guard the
    * idsAvailable Set for notifying blocking peek & take operations. Previously a separate Lock
    * object was used by the BlockingQueue for wait notify. This class will be performant if there is
    * a single peek thread.
-   * 
-   * 
    */
   private static class BlockingHARegionQueue extends HARegionQueue {
     /**
@@ -2180,19 +2089,11 @@ public class HARegionQueue implements RegionQueue {
     protected final StoppableCondition blockCond;
 
     /**
-     * 
-     * @param regionName
-     * @param cache
      * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can
      *        be set
-     * @param haContainer
      * @param isPrimary whether this is the primary queue for a client
-     * @throws IOException TODO-javadocs
-     * @throws ClassNotFoundException TODO-javadocs
-     * @throws CacheException TODO-javadocs
-     * @throws InterruptedException
      */
-    protected BlockingHARegionQueue(String regionName, GemFireCacheImpl cache,
+    protected BlockingHARegionQueue(String regionName, InternalCache cache,
         HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
         final byte clientConflation, boolean isPrimary)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2231,12 +2132,9 @@ public class HARegionQueue implements RegionQueue {
      * This effectively makes the blocking queue behave like a non-blocking queue which throttles
      * puts if it reaches its capacity. This was changed in 8.1, see #51400. This function is NOOP
      * in the HARegionQueue.
-     * 
-     * <p>
-     * author Asif
      */
     @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT")
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
     void checkQueueSizeConstraint() throws InterruptedException {
       if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
         if (Thread.interrupted())
@@ -2261,8 +2159,8 @@ public class HARegionQueue implements RegionQueue {
                       this.maxQueueSizeHitCount = 0;
                     }
                     ++this.maxQueueSizeHitCount;
-                    // for (;;) {
                     this.region.checkReadiness(); // fix for bug 37581
+                    // TODO: wait called while holding two locks
                     this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
                     this.region.checkReadiness(); // fix for bug 37581
                     // Fix for #51400. Allow the queue to grow beyond its
@@ -2270,15 +2168,13 @@ public class HARegionQueue implements RegionQueue {
                     // drain the queue, either due to a slower client or the
                     // deadlock scenario mentioned in the ticket.
                     reconcilePutPermits();
-                    // }
                     if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
                       logger.info(LocalizedMessage
                           .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
                     }
                   } catch (InterruptedException ex) {
-                    // TODO:Asif: The line below is meaningless. Comment it out
-                    // later
-                    this.permitMon.notify();
+                    // TODO: The line below is meaningless. Comment it out later
+                    this.permitMon.notifyAll();
                     throw ex;
                   }
                 }
@@ -2292,14 +2188,10 @@ public class HARegionQueue implements RegionQueue {
 
     /**
      * This function should always be called under a lock on putGuard & permitMon obejct
-     * 
-     * <p>
-     * author Asif
-     * 
-     * @return int currnet Put permits
+     *
+     * @return int current Put permits
      */
     private int reconcilePutPermits() {
-
       putPermits += takeSidePutPermits;
       takeSidePutPermits = 0;
       return putPermits;
@@ -2312,10 +2204,6 @@ public class HARegionQueue implements RegionQueue {
      * added in case a put operation which has reduced the put permit optmistically but due to some
      * reason ( most likely because of duplicate event) was not added in the queue. In such case it
      * will increment take side permit without notifying any waiting thread
-     * 
-     * <p>
-     * author Asif
-     * 
      */
     @Override
     void incrementTakeSidePutPermitsWithoutNotify() {
@@ -2328,24 +2216,19 @@ public class HARegionQueue implements RegionQueue {
      * Implemented to reduce contention between concurrent take/remove operations and put . The
      * reconciliation between take side put permits & put side put permits happens only if theput
      * side put permits are exhausted. In HARehionQueue base class this is a NOOP function
-     * 
-     * <p>
-     * author Asif
-     * 
      */
     @Override
     void incrementTakeSidePutPermits() {
       if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
         synchronized (this.permitMon) {
           ++this.takeSidePutPermits;
-          this.permitMon.notify();
+          this.permitMon.notifyAll();
         }
       }
     }
 
     /**
      * Identical to the acquireReadLock as there is only one type of Lock object in this class.
-     * 
      */
     @Override
     void acquireWriteLock() {
@@ -2378,8 +2261,6 @@ public class HARegionQueue implements RegionQueue {
      * acquiring the lock on ReentrantLock object. It blocks the thread if the queue is empty or
      * returns true otherwise . This will always return true indicating that data is available for
      * retrieval or throw an Exception.It can never return false.
-     * 
-     * @throws InterruptedException
      */
     @Override
     boolean waitForData() throws InterruptedException {
@@ -2443,7 +2324,7 @@ public class HARegionQueue implements RegionQueue {
     LinkedList unremovedElements = null;
     HashMap currDurableMap = null;
 
-    protected DurableHARegionQueue(String regionName, GemFireCacheImpl cache,
+    protected DurableHARegionQueue(String regionName, InternalCache cache,
         HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
         final byte clientConflation, boolean isPrimary)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2604,11 +2485,11 @@ public class HARegionQueue implements RegionQueue {
           this.releaseWriteLock();
         }
       }
-      /**
-       * ashetkar: Setting this threadlocal variable to null has no use as the current thread never
-       * uses it. Instead it should really be set null by message dispatcher thread while starting
-       * or resuming. This was added in revision 20914. Need to check if it really needs to be
-       * thread local.
+      /*
+       * Setting this threadlocal variable to null has no use as the current thread never uses it.
+       * Instead it should really be set null by message dispatcher thread while starting or
+       * resuming. This was added in revision 20914. Need to check if it really needs to be thread
+       * local.
        */
       peekedEventsContext.set(null);
       this.threadIdToSeqId.list.clear();
@@ -2675,38 +2556,27 @@ public class HARegionQueue implements RegionQueue {
    * bridge between the user defined HARegionQueue class & the actual class. This class object will
    * be buggy as it will tend to publish the Object o QRM thread & the expiry thread before the
    * complete creation of the HARegionQueue instance
-   * 
-   * <p>
-   * author Asif
-   * 
    */
   static class TestOnlyHARegionQueue extends HARegionQueue {
     /**
      * Overloaded constructor to accept haContainer.
      * 
-     * @param regionName
-     * @param cache
-     * @param haContainer
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws CacheException
-     * @throws InterruptedException
      * @since GemFire 5.7
      */
-    TestOnlyHARegionQueue(String regionName, Cache cache, Map haContainer)
+    TestOnlyHARegionQueue(String regionName, InternalCache cache, Map haContainer)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-      this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
-          haContainer, HandShake.CONFLATION_DEFAULT, false);
+      this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haContainer,
+          HandShake.CONFLATION_DEFAULT, false);
       this.initialized.set(true);
     }
 
-    TestOnlyHARegionQueue(String regionName, Cache cache)
+    TestOnlyHARegionQueue(String regionName, InternalCache cache)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-      this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
-          new HashMap(), HandShake.CONFLATION_DEFAULT, false);
+      this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, new HashMap(),
+          HandShake.CONFLATION_DEFAULT, false);
     }
 
-    TestOnlyHARegionQueue(String regionName, GemFireCacheImpl cache, HARegionQueueAttributes hrqa,
+    TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa,
         Map haContainer, final byte clientConflation, boolean isPrimary)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
       super(regionName, cache, hrqa, haContainer, null, clientConflation, isPrimary);
@@ -2718,34 +2588,21 @@ public class HARegionQueue implements RegionQueue {
     }
 
     /**
-     * Overloaded constructor to pass an <code>HashMap</code> instance as a haContainer.
+     * Overloaded constructor to pass an {@code HashMap} instance as a haContainer.
      * 
-     * @param regionName
-     * @param cache
-     * @param hrqa
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws CacheException
-     * @throws InterruptedException
      * @since GemFire 5.7
      */
-    TestOnlyHARegionQueue(String regionName, Cache cache, HARegionQueueAttributes hrqa)
+    TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa)
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-      this(regionName, (GemFireCacheImpl) cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT,
-          false);
+      this(regionName, cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT, false);
     }
   }
 
   /**
    * This thread will check for messages which have been dispatched. After a configurable time or
-   * size is reached, it will create a new <code>QueueRemovalMessage</code> and send it to all the
-   * nodes in the DistributedSystem
-   * 
-   * <p>
-   * author Mitul Bid
-   * 
+   * size is reached, it will create a new {@code QueueRemovalMessage} and send it to all the nodes
+   * in the DistributedSystem
    */
-
   private static class QueueRemovalThread extends Thread {
 
     /**
@@ -2753,14 +2610,14 @@ public class HARegionQueue implements RegionQueue {
      */
     private volatile boolean shutdown = false;
 
-    private final GemFireCacheImpl cache;
+    private final InternalCache cache;
 
     /**
      * Constructor : Creates and initializes the thread
      */
-    public QueueRemovalThread(GemFireCacheImpl c) {
+    public QueueRemovalThread(InternalCache cache) {
       this.setDaemon(true);
-      this.cache = c;
+      this.cache = cache;
     }
 
     private boolean checkCancelled() {
@@ -2775,11 +2632,7 @@ public class HARegionQueue implements RegionQueue {
 
     /**
      * The thread will check the dispatchedMessages map for messages that have been dispatched. It
-     * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
-     */
-    /**
-     * The thread will check the dispatchedMessages map for messages that have been dispatched. It
-     * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
+     * will create a new {@code QueueRemovalMessage} and send it to the other nodes
      */
     @Override
     public void run() {
@@ -2836,7 +2689,7 @@ public class HARegionQueue implements RegionQueue {
               dm.putOutgoing(qrm);
             } // messages exist
           } // be somewhat tolerant of failures
-          catch (CancelException e) {
+          catch (CancelException ignore) {
             if (logger.isDebugEnabled()) {
               logger.debug("QueueRemovalThread is exiting due to cancellation");
             }
@@ -2918,14 +2771,14 @@ public class HARegionQueue implements RegionQueue {
          * threadIdToSequenceIdMap.list.add(internalMap); } }
          */
         // first add the size within the lock
-        queueRemovalMessageList.add(Integer.valueOf(internalMap.size()));
+        queueRemovalMessageList.add(internalMap.size());
         internalIterator = internalMap.entrySet().iterator();
         // then add the event ids to the message list within the lock
         while (internalIterator.hasNext()) {
           internalEntry = (Map.Entry) internalIterator.next();
           tid = (ThreadIdentifier) internalEntry.getKey();
           sequenceId = (Long) internalEntry.getValue();
-          eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId.longValue());
+          eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId);
           queueRemovalMessageList.add(eventId);
         }
       }
@@ -2945,7 +2798,7 @@ public class HARegionQueue implements RegionQueue {
       boolean interrupted = Thread.interrupted();
       try {
         this.join(15 * 1000);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         interrupted = true;
       } finally {
         if (interrupted) {
@@ -2960,13 +2813,9 @@ public class HARegionQueue implements RegionQueue {
   }
 
   /**
-   * Class whick keeps track of the positions ( keys) of underlying Region object for the events
+   * Class which keeps track of the positions ( keys) of underlying Region object for the events
    * placed in the Queue. It also keeps track of the last sequence ID dispatched. Thus all the
    * events with sequence ID less than that dispatched are eligible for removal
-   * 
-   * <p>
-   * author Asif
-   * 
    */
   public static class DispatchedAndCurrentEvents implements DataSerializableFixedID, Serializable {
     /**
@@ -3006,10 +2855,6 @@ public class HARegionQueue implements RegionQueue {
     /**
      * Used for debugging purpose to ensure that in no situation , for a given ThreadIdentifier the
      * order gets violated
-     * 
-     * <p>
-     * author Asif
-     * 
      */
     protected volatile long lastSequenceIDPut = INIT_OF_SEQUENCEID;
 
@@ -3024,15 +2869,9 @@ public class HARegionQueue implements RegionQueue {
      * @param event Object to be added to the queue
      * @param sequenceID Sequence ID of the event originating from a unqiue thread identified by its
      *        ThreadIdentifier
-     * @throws CacheException
-     * @throws InterruptedException
      */
     protected boolean putObject(Conflatable event, long sequenceID)
         throws CacheException, InterruptedException {
-      // logger.debug("BRUCE: putObject() lastSequenceIDPut="+lastSequenceIDPut
-      // +"; adding sequenceID="+sequenceID + " for " + event);
-      // logger.info("putObject, sequenceID = " + sequenceID + "; lastSequenceIDPut = " +
-      // lastSequenceIDPut, new Exception("putObject"));
       Long oldPosition = null;
       final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER);
       if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID
@@ -3072,7 +2911,7 @@ public class HARegionQueue implements RegionQueue {
         }
         if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) {
           // Insert the object into the Region
-          Long position = Long.valueOf(owningQueue.tailKey.incrementAndGet());
+          Long position = owningQueue.tailKey.incrementAndGet();
 
           owningQueue.putEventInHARegion(event, position);
 
@@ -3128,15 +2967,11 @@ public class HARegionQueue implements RegionQueue {
 
     /**
      * Destroys the the old entry ( which got replaced by the new entry due to conflation) from the
-     * availableIDs , Region & Counters set. Since this is executed within a synch block by the new
+     * availableIDs , Region & Counters set. Since this is executed within a sync block by the new
      * entry thread, it is guaranteed that the old entry thread will exit first , placing the
-     * poistion etc in the available IDs set. Also the new entry thraed & old entry thread are
-     * belonging to diffrenet ThreadIdentifier objects & hence hold different
+     * position etc in the available IDs set. Also the new entry thread & old entry thread are
+     * belonging to different ThreadIdentifier objects & hence hold different
      * DispatchedAndCurrentEvents object.
-     * 
-     * @param oldPosition
-     * @throws CacheException
-     * @throws InterruptedException
      */
     private void removeOldConflatedEntry(Long oldPosition)
         throws CacheException, InterruptedException {
@@ -3152,9 +2987,8 @@ public class HARegionQueue implements RegionQueue {
           }
           // </HA overflow>
           // update statistics
-          // if (logger.isDebugEnabled()) {
 
-          // vrao: Fix for bug 39291:
+          // Fix for bug 39291:
           // Since markers are always conflated regardless of the conflation
           // setting and they are not normal (are internal) events, we should
           // not bump the events-conflated stat for markers.
@@ -3163,7 +2997,6 @@ public class HARegionQueue implements RegionQueue {
           } else {
             owningQueue.stats.incMarkerEventsConflated();
           }
-          // }
         }
       }
     }
@@ -3184,13 +3017,10 @@ public class HARegionQueue implements RegionQueue {
       ConcurrentMap conflationMap = (ConcurrentMap) owningQueue.indexes.get(rName);
       Assert.assertTrue(conflationMap != null);
       conflationMap.remove(key, position);
-
     }
 
     /**
      * Removes the Entry from the Counters Set contained in DACE
-     * 
-     * @param position
      */
     protected synchronized void destroy(Long position) {
       if (this.counters != null) {
@@ -3227,17 +3057,17 @@ public class HARegionQueue implements RegionQueue {
             owningQueue.eventsMap.remove(ti);
             expired = true;
             this.owningQueue.getStatistics().decThreadIdentifiers();
-          } catch (RegionDestroyedException ignore) {
+          } catch (RegionDestroyedException e) {
             if (!owningQueue.destroyInProgress && logger.isDebugEnabled()) {
               logger.debug(
                   "DispatchedAndCurrentEvents::expireOrUpdate: Queue found destroyed while removing expiry entry for ThreadIdentifier={} and expiry value={}",
-                  ti, expVal, ignore);
+                  ti, expVal, e);
             }
           } catch (EntryNotFoundException enfe) {
             if (!owningQueue.destroyInProgress) {
               logger.error(LocalizedMessage.create(
                   LocalizedStrings.HARegionQueue_DISPATCHEDANDCURRENTEVENTSEXPIREORUPDATE_UNEXPECTEDLY_ENCOUNTERED_EXCEPTION_WHILE_REMOVING_EXPIRY_ENTRY_FOR_THREADIDENTIFIER_0_AND_EXPIRY_VALUE_1,
-                  new Object[] {ti, Long.valueOf(expVal), enfe}));
+                  new Object[] {ti, expVal, enfe}));
             }
           }
         }
@@ -3245,7 +3075,7 @@ public class HARegionQueue implements RegionQueue {
       if (!expired) {
         try {
           // Update the entry with latest sequence ID
-          owningQueue.region.put(ti, Long.valueOf(this.lastDispatchedSequenceId));
+          owningQueue.region.put(ti, this.lastDispatchedSequenceId);
         } catch (CancelException e) {
           throw e;
         } catch (Exception e) {
@@ -3267,8 +3097,6 @@ public class HARegionQueue implements RegionQueue {
      * be removed
      * 
      * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
-     * @throws CacheException
-     * @throws InterruptedException
      */
     protected void setLastDispatchedIDAndRemoveEvents(long lastDispatchedSeqId)
         throws CacheException, InterruptedException {
@@ -3341,8 +3169,6 @@ public class HARegionQueue implements RegionQueue {
      *        Events which have been peeked & are now candidate for removal. It has to be guaranteed
      *        that the sequence IDs of all the other counters is less than the last dispatched
      * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
-     * @throws CacheException
-     * @throws InterruptedException
      */
     protected void setLastDispatchedIDAndRemoveEvents(List removedEventInfoList,
         long lastDispatchedSeqId) throws CacheException, InterruptedException {
@@ -3472,8 +3298,6 @@ public class HARegionQueue implements RegionQueue {
 
   /**
    * destroys the underlying HARegion and removes its reference from the dispatched messages map
-   * 
-   * @throws CacheWriterException
    */
   public void destroy() throws CacheWriterException {
     this.destroyInProgress = true;
@@ -3484,9 +3308,9 @@ public class HARegionQueue implements RegionQueue {
     try {
       try {
         updateHAContainer();
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
         // keep going
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         // keep going
         if (logger.isDebugEnabled()) {
           logger.debug("HARegionQueue#destroy: ignored cancellation!!!!");
@@ -3495,9 +3319,9 @@ public class HARegionQueue implements RegionQueue {
 
       try {
         this.region.destroyRegion();
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
         // keep going
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         // keep going
       }
       ((HAContainerWrapper) haContainer).removeProxy(regionName);
@@ -3510,8 +3334,6 @@ public class HARegionQueue implements RegionQueue {
    * If the event is an instance of HAEventWrapper, put it into the haContainer and then into the ha
    * region. Otherwise, simply put it into the ha region.
    * 
-   * @param event
-   * @param position
    * @since GemFire 5.7
    */
   protected void putEventInHARegion(Conflatable event, Long position) {
@@ -3623,7 +3445,7 @@ public class HARegionQueue implements RegionQueue {
    * If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its
    * clientUpdateMessage into the haContainer as <key, value>.
    * 
-   * @param haEventWrapper An instance of <code>HAEventWrapper</code>
+   * @param haEventWrapper An instance of {@code HAEventWrapper}
    * @since GemFire 5.7
    */
   protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) {
@@ -3690,7 +3512,7 @@ public class HARegionQueue implements RegionQueue {
       Object[] wrapperArray = null;
       acquireReadLock();
       try {
-        if (!(this.availableIDsSize() == 0)) {
+        if (this.availableIDsSize() != 0) {
           wrapperArray = this.availableIDsArray();
         }
       } finally {
@@ -3714,7 +3536,7 @@ public class HARegionQueue implements RegionQueue {
                   HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
                 }
               }
-            } catch (CancelException e) {
+            } catch (CancelException ignore) {
               return; // we're done
             } catch (Exception e) {
               if (logger.isDebugEnabled()) {
@@ -3751,10 +3573,9 @@ public class HARegionQueue implements RegionQueue {
    * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
    * in the haContainer, then decrements its reference count. If the decremented ref count is zero
    * and put is not in progress, removes the entry from the haContainer, before returning the
-   * <code>ClientUpdateMessage</code> instance.
+   * {@code ClientUpdateMessage} instance.
    * 
-   * @param conflatable
-   * @return An instance of <code>ClientUpdateMessage</code>
+   * @return An instance of {@code ClientUpdateMessage}
    * @since GemFire 5.7
    */
   public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) {
@@ -3778,7 +3599,6 @@ public class HARegionQueue implements RegionQueue {
    * Decrements wrapper's reference count by one. If the decremented ref count is zero and put is
    * not in progress, removes the entry from the haContainer.
    * 
-   * @param wrapper
    * @since GemFire 5.7
    */
   public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) {
@@ -3813,7 +3633,7 @@ public class HARegionQueue implements RegionQueue {
 
   /**
    * Set whether the dispatcher of this node is active or not (i.e. primary or secondary node). If
-   * <code>flag</code> is set to <code>true</code>, disables Entry Expiry Tasks.
+   * {@code flag} is set to {@code true}, disables Entry Expiry Tasks.
    * 
    * @param flag the value to set isPrimary to
    */
@@ -3830,7 +3650,7 @@ public class HARegionQueue implements RegionQueue {
   }
 
   /**
-   * Disables EntryExpiryTask for the HARegion (<code>this.region</code>).
+   * Disables EntryExpiryTask for the HARegion ({@code this.region}).
    * 
    */
   private void disableEntryExpiryTasks() {
@@ -3842,7 +3662,7 @@ public class HARegionQueue implements RegionQueue {
       this.region.setCustomEntryTimeToLive(new ThreadIdentifierCustomExpiry());
       logger.info(LocalizedMessage.create(
           LocalizedStrings.HARegionQueue_ENYTRY_EXPIRY_TASKS_DISABLED_BECAUSE_QUEUE_BECAME_PRIMARY_OLD_MSG_TTL_0,
-          new Object[] {Integer.valueOf(oldTimeToLive)}));
+          new Object[] {oldTimeToLive}));
     }
   }
 
@@ -3882,7 +3702,7 @@ public class HARegionQueue implements RegionQueue {
     if (r != null && !r.isDestroyed()) {
       try {
         r.close();
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
       }
     }
   }
@@ -3895,97 +3715,83 @@ public class HARegionQueue implements RegionQueue {
    */
   public boolean isPeekInitialized() {
     return HARegionQueue.peekedEventsContext.get() != null;
-
   }
 
-}
-
-
-/**
- * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is operating
- * on it. This wrapper acts as a means of communication between the QRM thread & the MapWrapper
- * object contained in the HARegionQueue
- * 
- * <p>
- * author ashahid
- */
-
-class MapWrapper {
-  Map map;
+  /**
+   * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is
+   * operating on it. This wrapper acts as a means of communication between the QRM thread & the
+   * MapWrapper object contained in the HARegionQueue
+   */
+  static class MapWrapper {
+    Map map;
 
-  List list;
+    List list;
 
-  boolean keepPrevAcks = false;
+    boolean keepPrevAcks = false;
 
-  public MapWrapper() {
-    super();
-    map = new HashMap();
-    list = new LinkedList();
-  }
+    public MapWrapper() {
+      super();
+      map = new HashMap();
+      list = new LinkedList();
+    }
 
-  void put(Object key, Object o) {
-    synchronized (this.map) {
-      this.map.put(key, o);
+    void put(Object key, Object o) {
+      synchronized (this.map) {
+        this.map.put(key, o);
+      }
     }
   }
-}
-
 
-/**
- * A wrapper class that has counter, key and the region-name for an event which was peeked and needs
- * to be removed. The key and regionName fields will be set only if conflation is true for the
- * event.
- * 
- * <p>
- * author dpatel
- * 
- */
-
-class RemovedEventInfo {
-  Long counter;
+  /**
+   * A wrapper class that has counter, key and the region-name for an event which was peeked and
+   * needs to be removed. The key and regionName fields will be set only if conflation is true for
+   * the event.
+   */
+  static class RemovedEventInfo {
+    Long counter;
 
-  String regionName;
+    String regionName;
 
-  Object key;
+    Object key;
 
-  public RemovedEventInfo(Long counter, String regionName, Object key) {
-    this.counter = counter;
-    this.regionName = regionName;
-    this.key = key;
+    public RemovedEventInfo(Long counter, String regionName, Object key) {
+      this.counter = counter;
+      this.regionName = regionName;
+      this.key = key;
+    }
   }
-}
 
+  /** this is used to expire thread identifiers, even in primary queues */
+  static class ThreadIdentifierCustomExpiry implements CustomExpiry {
+    private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
+        HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
+    private static volatile ExpirationAttributes testExpAtts = null;
 
-/** this is used to expire thread identifiers, even in primary queues */
-class ThreadIdentifierCustomExpiry implements CustomExpiry {
-  private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
-      HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
-  private static volatile ExpirationAttributes testExpAtts = null;
-
-  public ExpirationAttributes getExpiry(Region.Entry entry) {
-    // Use key to determine expiration.
-    Object key = entry.getKey();
-    if (key instanceof ThreadIdentifier) {
-      final int expTime = HARegionQueue.threadIdExpiryTime;
-      if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
-        // This should only happen in unit test code
-        ExpirationAttributes result = testExpAtts;
-        if (result == null || result.getTimeout() != expTime) {
-          result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
-          // save the expiration attributes in a static to prevent tests from creating lots of
-          // instances.
-          testExpAtts = result;
+    public ExpirationAttributes getExpiry(Region.Entry entry) {
+      // Use key to determine expiration.
+      Object key = entry.getKey();
+      if (key instanceof ThreadIdentifier) {
+        final int expTime = HARegionQueue.threadIdExpiryTime;
+        if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
+          // This should only happen in unit test code
+          ExpirationAttributes result = testExpAtts;
+          if (result == null || result.getTimeout() != expTime) {
+            result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
+            // save the expiration attributes in a static to prevent tests from creating lots of
+            // instances.
+            testExpAtts = result;
+          }
+          return result;
+        } else {
+          return DEFAULT_THREAD_ID_EXP_ATTS;
         }
-        return result;
       } else {
-        return DEFAULT_THREAD_ID_EXP_ATTS;
+        return null;
       }
-    } else {
-      return null;
     }
-  }
 
-  public void close() {}
+    public void close() {}
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index ca9cc20..7879538 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -33,6 +33,7 @@ import org.apache.geode.distributed.internal.PooledDistributionMessage;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -42,34 +43,17 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
  * This message is sent to all the nodes in the DistributedSystem. It contains the list of messages
  * that have been dispatched by this node. The messages are received by other nodes and the
  * processing is handed over to an executor
- * 
- * 
  */
 public final class QueueRemovalMessage extends PooledDistributionMessage {
   private static final Logger logger = LogService.getLogger();
 
-  // /**
-  // * Executor for processing incoming messages
-  // */
-  // private static final Executor executor;
-
-
   /**
    * List of messages (String[] )
    */
   private List messagesList;
 
-  // /**
-  // * create the executor in a static block
-  // */
-  // static {
-  // //TODO:Mitul best implementation of executor for this task?
-  // executor = Executors.newCachedThreadPool();
-  // }
-
   /**
    * Constructor : Set the recipient list to ALL_RECIPIENTS
-   * 
    */
   public QueueRemovalMessage() {
     this.setRecipient(ALL_RECIPIENTS);
@@ -77,8 +61,6 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
 
   /**
    * Set the message list
-   * 
-   * @param messages
    */
   public void setMessagesList(List messages) {
     this.messagesList = messages;
@@ -87,22 +69,19 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
   /**
    * Extracts the region from the message list and hands over the message removal task to the
    * executor
-   * 
-   * @param dm
    */
   @Override
   protected void process(DistributionManager dm) {
-
-    final GemFireCacheImpl cache;
+    final InternalCache cache;
     // use GemFireCache.getInstance to avoid blocking during cache.xml processing.
-    cache = GemFireCacheImpl.getInstance(); // CacheFactory.getAnyInstance();
+    cache = GemFireCacheImpl.getInstance();
     if (cache != null) {
       Iterator iterator = this.messagesList.iterator();
       int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
       try {
         while (iterator.hasNext()) {
           final String regionName = (String) iterator.next();
-          final int size = ((Integer) iterator.next()).intValue();
+          final int size = (Integer) iterator.next();
           final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
           final HARegionQueue hrq;
           if (region == null || !region.isInitialized()) {
@@ -134,21 +113,21 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
                       regionName, id);
                 }
                 hrq.removeDispatchedEvents(id);
-              } catch (RegionDestroyedException rde) {
+              } catch (RegionDestroyedException ignore) {
                 logger.info(LocalizedMessage.create(
                     LocalizedStrings.QueueRemovalMessage_QUEUE_FOUND_DESTROYED_WHILE_PROCESSING_THE_LAST_DISPTACHED_SEQUENCE_ID_FOR_A_HAREGIONQUEUES_DACE_THE_EVENT_ID_IS_0_FOR_HAREGION_WITH_NAME_1,
                     new Object[] {id, regionName}));
-              } catch (CancelException e) {
+              } catch (CancelException ignore) {
                 return; // cache or DS is closing
               } catch (CacheException e) {
                 logger.error(LocalizedMessage.create(
                     LocalizedStrings.QueueRemovalMessage_QUEUEREMOVALMESSAGEPROCESSEXCEPTION_IN_PROCESSING_THE_LAST_DISPTACHED_SEQUENCE_ID_FOR_A_HAREGIONQUEUES_DACE_THE_PROBLEM_IS_WITH_EVENT_ID__0_FOR_HAREGION_WITH_NAME_1,
                     new Object[] {regionName, id}), e);
-              } catch (InterruptedException ie) {
+              } catch (InterruptedException ignore) {
                 return; // interrupt occurs during shutdown. this runs in an executor, so just stop
                         // processing
               }
-            } catch (RejectedExecutionException e) {
+            } catch (RejectedExecutionException ignore) {
               interrupted = true;
             } finally {
               if (interrupted) {
@@ -165,14 +144,13 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
 
   @Override
   public void toData(DataOutput out) throws IOException {
-    /**
+    /*
      * first write the total list size then in a loop write the region name, number of eventIds and
      * the event ids
-     * 
      */
     super.toData(out);
     // write the size of the data list
-    DataSerializer.writeInteger(Integer.valueOf(this.messagesList.size()), out);
+    DataSerializer.writeInteger(this.messagesList.size(), out);
     Iterator iterator = messagesList.iterator();
     String regionName = null;
     Integer numberOfIds = null;
@@ -185,7 +163,7 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
       numberOfIds = (Integer) iterator.next();
       // write the number of event ids
       DataSerializer.writeInteger(numberOfIds, out);
-      maxVal = numberOfIds.intValue();
+      maxVal = numberOfIds;
       // write the event ids
       for (int i = 0; i < maxVal; i++) {
         eventId = iterator.next();
@@ -200,14 +178,13 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    /**
+    /*
      * read the total list size, reconstruct the message list in a loop by reading the region name,
      * number of eventIds and the event ids
-     * 
      */
     super.fromData(in);
     // read the size of the message
-    int size = DataSerializer.readInteger(in).intValue();
+    int size = DataSerializer.readInteger(in);
     this.messagesList = new LinkedList();
     int eventIdSizeInt;
     for (int i = 0; i < size; i++) {
@@ -216,7 +193,7 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
       // read the datasize
       Integer eventIdSize = DataSerializer.readInteger(in);
       this.messagesList.add(eventIdSize);
-      eventIdSizeInt = eventIdSize.intValue();
+      eventIdSizeInt = eventIdSize;
       // read the total number of events
       for (int j = 0; j < eventIdSizeInt; j++) {
         this.messagesList.add(DataSerializer.readObject(in));

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
index df6e2f2..eeb3704 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
@@ -134,7 +134,7 @@ public class BecomePrimaryBucketMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId);
     buff.append("; isRebalance=").append(this.isRebalance);

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
index 04349aa..ed17740 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
@@ -114,7 +114,7 @@ public final class BucketSizeMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
index 3cca861..d6422c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
@@ -143,7 +143,7 @@ public final class ContainsKeyValueMessage extends PartitionMessageWithDirectRep
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; valueCheck=").append(this.valueCheck).append("; key=").append(this.key)
         .append("; bucketId=").append(this.bucketId);

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
index 03b5ded..744e013 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
@@ -172,9 +172,10 @@ public final class CreateBucketMessage extends PartitionMessage {
    * Assists the toString method in reporting the contents of this message
    * 
    * @see PartitionMessage#toString()
+   * @param buff
    */
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId).append("; bucketSize=")
         .append(this.bucketSize);

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
index e765df0..f305fa8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
@@ -113,7 +113,7 @@ public class DeposePrimaryBucketMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index dc55835..bffaf4d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -419,9 +419,10 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
    * Assists the toString method in reporting the contents of this message
    * 
    * @see PartitionMessage#toString()
+   * @param buff
    */
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; key=").append(getKey());
     if (originalSender != null) {

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
index e9468dc..6314f2d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
@@ -315,7 +315,7 @@ public final class DumpB2NRegion extends PartitionMessage {
    * StringBuffer)
    */
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append(" bucketId=").append(this.bucketId).append(" primaryInfoOnly=")
         .append(this.onlyReturnPrimaryInfo);

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
index 4a09f94..0502d5f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
@@ -107,7 +107,7 @@ public class EndBucketCreationMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId);
     buff.append("; newPrimary=").append(this.newPrimary);

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
index 7208baf..41186ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -47,7 +47,6 @@ import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketDump;
 import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.PartitionedRegion;
@@ -139,7 +138,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketIds);
     buff.append("; recipient=").append(this.getRecipient());

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
index b0f052a..c7ca279 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
@@ -128,7 +128,7 @@ public final class FetchEntriesMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; bucketId=").append(this.bucketId);
     buff.append("; recipient=").append(this.getRecipient());

http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
index ae2ce37..301e154 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
@@ -174,7 +174,7 @@ public final class FetchEntryMessage extends PartitionMessage {
   }
 
   @Override
-  protected void appendFields(StringBuffer buff) {
+  protected void appendFields(StringBuilder buff) {
     super.appendFields(buff);
     buff.append("; key=").append(this.key);
   }