You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/01 19:49:19 UTC
[06/36] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 7ee5a8d..9c2ea23 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -14,8 +14,52 @@
*/
package org.apache.geode.internal.cache.ha;
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.MirrorType;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.query.internal.CqQueryVsdStats;
import org.apache.geode.cache.query.internal.cq.CqService;
@@ -30,9 +74,20 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.internal.cache.tier.sockets.*;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.cache.tier.sockets.HandShake;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -41,17 +96,6 @@ import org.apache.geode.internal.util.concurrent.StoppableCondition;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.*;
/**
* An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be
@@ -72,26 +116,23 @@ import java.util.concurrent.locks.*;
*
* 30 May 2008: 5.7 onwards the underlying GemFire Region will continue to have key as counter(long)
* but the value will be a wrapper object(HAEventWrapper) which will be a key in a separate data
- * strucure called haContainer (an implementation of Map). The value against this wrapper will be
+ * structure called haContainer (an implementation of Map). The value against this wrapper will be
* the offered object in the queue. The purpose of this modification is to allow multiple
* ha-region-queues share their offered values without storing separate copies in memory, upon GII.
*
* (See BlockingHARegionQueue)
*
- *
* @since GemFire 4.3
- *
*/
public class HARegionQueue implements RegionQueue {
private static final Logger logger = LogService.getLogger();
- /** The <code>Region</code> backing this queue */
+ /** The {@code Region} backing this queue */
protected HARegion region;
/**
- * The key into the <code>Region</code> used when putting entries onto the queue. The counter uses
+ * The key into the {@code Region} used when putting entries onto the queue. The counter uses
* incrementAndGet so counter will always be started from 1
- *
*/
protected final AtomicLong tailKey = new AtomicLong(0);
@@ -100,26 +141,21 @@ public class HARegionQueue implements RegionQueue {
* object. Every add operation will be identified by the ThreadIdentifier object & the position
* recorded in the LastDispatchedAndCurrentEvents object.
*/
-
protected final ConcurrentMap eventsMap = new ConcurrentHashMap();
/**
- * The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
- * updating of entries in the queue for conflation.
+ * The {@code Map} mapping the regionName->key to the queue key. This index allows fast updating
+ * of entries in the queue for conflation.
*/
protected volatile Map indexes = Collections.unmodifiableMap(new HashMap());
- // TODO:Asif: Should we worry about whether to some how make it writer
- // preference?
- /** Lock object for updating the queue size by different operations */
- // private final Object SIZE_LOCK = new Object();
private final StoppableReentrantReadWriteLock rwLock;
private final StoppableReentrantReadWriteLock.StoppableReadLock readLock;
private final StoppableWriteLock writeLock;
- /** The name of the <code>Region</code> backing this queue */
+ /** The name of the {@code Region} backing this queue */
private final String regionName;
/** The ClientProxyMembershipID associated with the ha queue */
@@ -151,10 +187,7 @@ public class HARegionQueue implements RegionQueue {
* A sequence violation can occur , if an HARegionQueue receives events thru GII & the same event
* also arrives via Gemfire Put in that local VM. If the HARegionQueue does not receive any data
* via GII , then there should not be any violation. If there is data arriving thru GII, such
- * voiolations can be expected , but should be analyzed thoroughly.
- *
- * <p>
- * author Asif
+ * violations can be expected , but should be analyzed thoroughly.
*/
protected boolean puttingGIIDataInQueue;
@@ -166,14 +199,12 @@ public class HARegionQueue implements RegionQueue {
/**
* a thread local to store the counters corresponding to the events peeked by a particular thread.
- * When <code>remove()</code> will be called, these events stored in thread-local will be
- * destroyed.
+ * When {@code remove()} will be called, these events stored in thread-local will be destroyed.
*/
protected static final ThreadLocal peekedEventsContext = new ThreadLocal();
/**
- * Thread which creates the <code>QueueRemovalMessage</code> and sends it to other nodes in the
- * system
+ * Thread which creates the {@code QueueRemovalMessage} and sends it to other nodes in the system
*/
private static QueueRemovalThread qrmThread;
@@ -284,30 +315,18 @@ public class HARegionQueue implements RegionQueue {
protected long maxQueueSizeHitCount = 0;
/**
- *
* Processes the given string and returns a string which is allowed for region names
*
- * @param regionName
* @return legal region name
*/
public static String createRegionName(String regionName) {
- String result = regionName.replace('/', '#'); // [yogi]: region name cannot
- // contain the separator '/'
- return result;
+ return regionName.replace('/', '#');
}
/**
- *
- * @param regionName
- * @param cache
* @param isPrimary whether this is the primary queue for a client
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
*/
-
- protected HARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected HARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes haAttributes, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -332,13 +351,13 @@ public class HARegionQueue implements RegionQueue {
this.readLock = this.rwLock.readLock();
this.writeLock = this.rwLock.writeLock();
- this.putGIIDataInRegion();
+ putGIIDataInRegion();
if (this.getClass() == HARegionQueue.class) {
initialized.set(true);
}
}
- private void createHARegion(String processedRegionName, GemFireCacheImpl cache)
+ private void createHARegion(String processedRegionName, InternalCache cache)
throws IOException, ClassNotFoundException {
AttributesFactory af = new AttributesFactory();
af.setMirrorType(MirrorType.KEYS_VALUES);
@@ -358,7 +377,7 @@ public class HARegionQueue implements RegionQueue {
* reinitialize the queue, presumably pulling current information from seconaries
*/
public void reinitializeRegion() {
- GemFireCacheImpl cache = this.region.getCache();
+ InternalCache cache = this.region.getCache();
String regionName = this.region.getName();
this.region.destroyRegion();
Exception problem = null;
@@ -412,7 +431,7 @@ public class HARegionQueue implements RegionQueue {
// use putIfAbsent to avoid overwriting newer dispatch information
Object o = this.eventsMap.putIfAbsent(entry.getKey(), giiDace);
if (o != null && isDebugEnabled_BS) {
- sb.append(" -- could not store. found " + o);
+ sb.append(" -- could not store. found ").append(o);
}
}
}
@@ -425,11 +444,8 @@ public class HARegionQueue implements RegionQueue {
* Repopulates the HARegion after the GII is over so as to reset the counters and populate the
* DACE objects for the thread identifiers . This method should be invoked as the last method in
* the constructor . Thus while creating BlockingQueue this method should be invoked lastly in the
- * derived class contructor , after the HARegionQueue contructor is complete. Otherwise, the
+ * derived class constructor , after the HARegionQueue contructor is complete. Otherwise, the
* ReentrantLock will be null.
- *
- * @throws CacheException
- * @throws InterruptedException
*/
void putGIIDataInRegion() throws CacheException, InterruptedException {
Set entrySet = this.region.entries(false);
@@ -498,8 +514,6 @@ public class HARegionQueue implements RegionQueue {
* Puts the GII'd entry into the ha region, if it was GII'd along with its ClientUpdateMessageImpl
* instance.
*
- * @param val
- * @throws InterruptedException
* @since GemFire 5.7
*/
protected void putInQueue(Object val) throws InterruptedException {
@@ -507,7 +521,7 @@ public class HARegionQueue implements RegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(
"HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.",
- ((HAEventWrapper) val).getKeyToConflate());
+ ((Conflatable) val).getKeyToConflate());
}
} else {
this.put(val);
@@ -567,9 +581,6 @@ public class HARegionQueue implements RegionQueue {
* object & SIZE Lock
*
* @param object object to put onto the queue
- * @throws InterruptedException
- * @throws CacheException
- * @return boolean
*/
public boolean put(Object object) throws CacheException, InterruptedException {
this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
@@ -663,11 +674,9 @@ public class HARegionQueue implements RegionQueue {
dace = oldDace;
} else {
// Add the recently added ThreadIdentifier to the RegionQueue for expiry
- this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+ this.region.put(ti, dace.lastDispatchedSequenceId);
// update the stats
- // if (logger.isDebugEnabled()) {
this.stats.incThreadIdentifiers();
- // }
}
if (!dace.putObject(event, sequenceID)) {
this.put(object);
@@ -677,11 +686,6 @@ public class HARegionQueue implements RegionQueue {
}
}
}
- // update the stats
- // if (logger.isDebugEnabled()) {
- // this.stats.incEventsEnqued();
- // }
-
}
/**
@@ -691,7 +695,7 @@ public class HARegionQueue implements RegionQueue {
*/
public void startGiiQueueing() {
this.giiLock.writeLock().lock();
- this.giiCount++;
+ this.giiCount++; // TODO: non-atomic operation on volatile!
if (logger.isDebugEnabled()) {
logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount);
}
@@ -710,7 +714,7 @@ public class HARegionQueue implements RegionQueue {
this.giiLock.writeLock().lock();
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
- this.giiCount--;
+ this.giiCount--; // TODO: non-atomic operation on volatile!
if (isDebugEnabled) {
logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount);
}
@@ -731,15 +735,7 @@ public class HARegionQueue implements RegionQueue {
Object value;
try {
value = this.giiQueue.remove();
- } catch (NoSuchElementException e) {
- // if (actualCount != expectedCount) {
- // logger.severe(LocalizedStrings.DEBUG, "expected to drain "
- // + expectedCount + " messages but drained " + actualCount
- // + " queue.size() is now " + giiQueue.size() + ", in queue" + this, e);
- // } else {
- // logger.severe(LocalizedStrings.DEBUG, "drained " + actualCount + " messages. Queue
- // size is " + giiQueue.size() + " in " + this);
- // }
+ } catch (NoSuchElementException ignore) {
break;
}
actualCount++;
@@ -765,13 +761,11 @@ public class HARegionQueue implements RegionQueue {
if (value instanceof HAEventWrapper) {
decAndRemoveFromHAContainer((HAEventWrapper) value);
}
- } catch (NoSuchElementException e) {
+ } catch (NoSuchElementException ignore) {
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
// complete draining while holding the write-lock so nothing else
// can get into the queue
- // logger.severe(LocalizedStrings.DEBUG, "endGiiQueueing interrupted - ignoring until
- // draining completes");
interrupted = true;
}
}
@@ -804,19 +798,19 @@ public class HARegionQueue implements RegionQueue {
*/
public Map getEventMapForGII() {
// fix for bug #41621 - concurrent modification exception while serializing event map
- Map<ThreadIdentifier, DispatchedAndCurrentEvents> events = this.eventsMap;
final boolean isDebugEnabled = logger.isDebugEnabled();
do {
HashMap result = new HashMap();
try {
- for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : events.entrySet()) {
+ for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : ((Map<ThreadIdentifier, DispatchedAndCurrentEvents>) this.eventsMap)
+ .entrySet()) {
if (entry.getValue().isCountersEmpty()) {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
- } catch (ConcurrentModificationException e) { // TODO:WTF: bad practice but eventsMap is
- // ConcurrentHashMap
+ } catch (ConcurrentModificationException ignore) {
+ // TODO:WTF: bad practice but eventsMap is ConcurrentHashMap
if (isDebugEnabled) {
logger.debug(
"HARegion encountered concurrent modification exception while analysing event state - will try again");
@@ -826,9 +820,7 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Implementation in BlokcingHARegionQueue class
- *
- * @throws InterruptedException
+ * Implementation in BlockingHARegionQueue class
*/
void checkQueueSizeConstraint() throws InterruptedException {
if (Thread.interrupted())
@@ -846,13 +838,11 @@ public class HARegionQueue implements RegionQueue {
/**
* Creates the static dispatchedMessagesMap (if not present) and starts the QueuRemovalThread if
* not running
- *
*/
- public static synchronized void startHAServices(GemFireCacheImpl c) {
-
+ static synchronized void startHAServices(InternalCache cache) {
if (qrmThread == null) {
dispatchedMessagesMap = new ConcurrentHashMap();
- qrmThread = new QueueRemovalThread(c);
+ qrmThread = new QueueRemovalThread(cache);
qrmThread.setName("Queue Removal Thread");
qrmThread.start();
}
@@ -913,20 +903,16 @@ public class HARegionQueue implements RegionQueue {
}
}
Object key = event.getKeyToConflate();
- Long previousPosition = (Long) latestIndexesForRegion.put(key, newPosition);
- return previousPosition;
-
+ return (Long) latestIndexesForRegion.put(key, newPosition);
}
/**
- *
* Creates and returns a ConcurrentMap. This method is over-ridden in test classes to test some
* functionality
*
* @return new ConcurrentMap
*/
ConcurrentMap createConcurrentMap() {
-
return new ConcurrentHashMap();
}
@@ -948,7 +934,7 @@ public class HARegionQueue implements RegionQueue {
// if (!HARegionQueue.this.isPrimary()) {
HARegionQueue.this.expireTheEventOrThreadIdentifier(event);
// }
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// ignore, we're done
} catch (CacheException ce) {
if (!destroyInProgress) {
@@ -967,11 +953,8 @@ public class HARegionQueue implements RegionQueue {
* overridden function createCacheListenerForHARegion of the HARegionQueueJUnitTest class for the
* test testConcurrentEventExpiryAndTake. This function provides the meaningful functionality for
* expiry of the Event object as well as ThreadIdentifier
- * <p>
- * author Asif
- *
+ *
* @param event event object representing the data being expired
- * @throws CacheException
*/
void expireTheEventOrThreadIdentifier(EntryEvent event) throws CacheException {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -980,11 +963,6 @@ public class HARegionQueue implements RegionQueue {
"HARegionQueue::afterInvalidate. Entry Event being invalidated:{}, isPrimaryQueue:{}",
event, HARegionQueue.this.isPrimary());
}
- // if (HARegionQueue.this.isPrimary()) {
- // logger.info(LocalizedStrings.DEBUG,
- // "HARegionQueue: Entry Event being invalidated ="
- // + event+", after current queue became primary.");
- // }
Object key = event.getKey();
if (key instanceof ThreadIdentifier) {
// Check if the sequenceID present as value against this key is same
@@ -998,7 +976,7 @@ public class HARegionQueue implements RegionQueue {
(DispatchedAndCurrentEvents) HARegionQueue.this.eventsMap.get(key);
Assert.assertTrue(dace != null);
Long expirySequenceID = (Long) event.getOldValue();
- boolean expired = dace.expireOrUpdate(expirySequenceID.longValue(), (ThreadIdentifier) key);
+ boolean expired = dace.expireOrUpdate(expirySequenceID, (ThreadIdentifier) key);
if (isDebugEnabled) {
logger.debug(
"HARegionQueue::afterInvalidate:Size of the region after expiring or updating the ThreadIdentifier={}",
@@ -1028,21 +1006,18 @@ public class HARegionQueue implements RegionQueue {
/**
* This method adds the position of newly added object to the List of available IDs so that it is
- * avaialble for peek or take. This method is called from DispatchedAndCurrentEvents object. This
+ * available for peek or take. This method is called from DispatchedAndCurrentEvents object. This
* method is invoked in a write lock for non blocking queue & in a reentrant lock in a blocking
- * queue. In case of blokcing queue , this method also signals the waiting take & peek threads to
+ * queue. In case of blocking queue , this method also signals the waiting take & peek threads to
* awake.
- * <p>
- * author Asif
- *
+ *
* @param position The Long position of the object which has been added
- * @throws InterruptedException
*/
void publish(Long position) throws InterruptedException {
acquireWriteLock();
try {
this.idsAvailable.add(position);
- // Asif:Notify the wiating peek threads or take threads of blocking queue
+ // Notify the waiting peek threads or take threads of blocking queue
// A void operation for the non blocking queue operations
notifyPeekAndTakeThreads();
} finally {
@@ -1055,12 +1030,8 @@ public class HARegionQueue implements RegionQueue {
}
/**
- *
- * <p>
- * author Asif
- *
* @param position Long value present in the Available IDs map against which Event object is
- * present in HARegion. This function is directly ivnoked from the basicInvalidate function
+ * present in HARegion. This function is directly invoked from the basicInvalidate function
* where expiry is aborted if this function returns false
* @return boolean true if the position could be removed from the Set
* @throws InterruptedException *
@@ -1087,12 +1058,10 @@ public class HARegionQueue implements RegionQueue {
* in the AvailableID Set. If the position existed in the Set, then only it is removed from the
* Set & the underlying Region
*
- * @param position Long poistion counter for entry in the Region
+ * @param position Long position counter for entry in the Region
* @return true if the entry with <br>
* position <br>
* specified was removed from the Set
- * @throws InterruptedException
- *
*/
protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException {
boolean removedOK = this.destroyFromAvailableIDs(position);
@@ -1100,9 +1069,7 @@ public class HARegionQueue implements RegionQueue {
if (removedOK) {
try {
this.destroyFromQueue(position);
- } catch (EntryNotFoundException enfe) {
- // if (!this.region.isDestroyed()) {
- // if (!HARegionQueue.this.destroyInProgress || !this.region.isDestroyed()) {
+ } catch (EntryNotFoundException ignore) {
if (!HARegionQueue.this.destroyInProgress) {
if (!this.region.isDestroyed()) {
Assert.assertTrue(false, "HARegionQueue::remove: The position " + position
@@ -1128,7 +1095,7 @@ public class HARegionQueue implements RegionQueue {
maintainCqStats(event, -1);
}
- /** Returns the <code>toString</code> for this RegionQueue object */
+ /** Returns the {@code toString} for this RegionQueue object */
@Override
public String toString() {
return "RegionQueue on " + this.regionName + "(" + (this.isPrimary ? "primary" : "backup")
@@ -1144,12 +1111,9 @@ public class HARegionQueue implements RegionQueue {
/**
- * This method is inoked by the take function . For non blocking queue it returns null or a valid
+ * This method is invoked by the take function . For non blocking queue it returns null or a valid
* long position while for blocking queue it waits for data in the queue or throws Exception if
* the thread encounters exception while waiting.
- *
- * @throws CacheException
- * @throws InterruptedException
*/
protected Long getAndRemoveNextAvailableID() throws InterruptedException {
Long next = null;
@@ -1177,14 +1141,9 @@ public class HARegionQueue implements RegionQueue {
/**
* Returns the next position counter present in idsAvailable set. This method is invoked by the
* peek function. In case of BlockingQueue, this method waits till a valid ID is available.
- *
- * <p>
- * author Asif
- *
+ *
* @return valid Long poistion or null depending upon the nature of the queue
- * @throws InterruptedException
* @throws TimeoutException if operation is interrupted (unfortunately)
- *
*/
private Long getNextAvailableID() throws InterruptedException {
Long next = null;
@@ -1208,7 +1167,6 @@ public class HARegionQueue implements RegionQueue {
* For non blocking queue , this method either returns null or an Object. For blocking queue it
* will always return with an Object or wait for queue to be populated.
*
- * @throws InterruptedException
* @throws CacheException The exception can be thrown by BlockingQueue if it encounters
* InterruptedException while waiting for data
*/
@@ -1281,8 +1239,6 @@ public class HARegionQueue implements RegionQueue {
/**
* Removes the events that were peeked by this thread. The events are destroyed from the queue and
* conflation map and DispatchedAndCurrentEvents are updated accordingly.
- *
- * @throws InterruptedException
*/
public void remove() throws InterruptedException {
List peekedIds = (List) HARegionQueue.peekedEventsContext.get();
@@ -1324,10 +1280,10 @@ public class HARegionQueue implements RegionQueue {
List countersList;
if ((countersList = (List) groupedThreadIDs.get(threadid)) != null) {
countersList.add(info);
- countersList.set(0, Long.valueOf(sequenceId));
+ countersList.set(0, sequenceId);
} else {
countersList = new ArrayList();
- countersList.add(Long.valueOf(sequenceId));
+ countersList.add(sequenceId);
countersList.add(info);
groupedThreadIDs.put(threadid, countersList);
}
@@ -1344,7 +1300,7 @@ public class HARegionQueue implements RegionQueue {
Map.Entry element = (Map.Entry) iter.next();
ThreadIdentifier tid = (ThreadIdentifier) element.getKey();
List removedEvents = (List) element.getValue();
- long lastDispatchedId = ((Long) removedEvents.remove(0)).longValue();
+ long lastDispatchedId = (Long) removedEvents.remove(0);
DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(tid);
if (dace != null && dace.lastDispatchedSequenceId < lastDispatchedId) {
try {
@@ -1387,7 +1343,7 @@ public class HARegionQueue implements RegionQueue {
if (next == null) {
break;
}
- } catch (TimeoutException te) {
+ } catch (TimeoutException ignore) {
throw new InterruptedException();
}
object = (Conflatable) this.region.get(next);
@@ -1459,8 +1415,6 @@ public class HARegionQueue implements RegionQueue {
* @param timeToWait The number of milliseconds to attempt to peek
*
* @return The list of events peeked
- * @throws InterruptedException
- *
*/
public List peek(int batchSize, int timeToWait) throws InterruptedException {
long start = System.currentTimeMillis();
@@ -1506,7 +1460,7 @@ public class HARegionQueue implements RegionQueue {
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(50); // TODO this seems kinda busy IMNSHO -- jason
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
this.region.getCancelCriterion().checkCancelInProgress(null);
} finally {
@@ -1520,11 +1474,10 @@ public class HARegionQueue implements RegionQueue {
/**
* This method prepares the batch of events and updates the thread-context with corresponding
* counters, so that when remove is called by this thread, these events are destroyed from the
- * queue.This method should always be invoked within the <code>rwLock</code>.
+ * queue.This method should always be invoked within the {@code rwLock}.
*
* @param batchSize - number of events to be peeked
* @return - list of events peeked
- * @throws CacheException
*/
private List getBatchAndUpdateThreadContext(int batchSize) {
Iterator itr = this.idsAvailable.iterator();
@@ -1559,26 +1512,22 @@ public class HARegionQueue implements RegionQueue {
}
public void addCacheListener(CacheListener listener) {
- // TODO Auto-generated method stub
-
+ // nothing
}
public void removeCacheListener() {
- // TODO Auto-generated method stub
+ // nothing
}
/**
* It adds the entry to a static data structure dispatchedMessagesMap which is periodically
* operated upon by the QRM thread.
- *
- * <p>
- * author Asif
- *
+ *
* @param tid - the ThreadIdentifier object for this event
* @param sequenceId - the sequence id for this event
*/
public void addDispatchedMessage(ThreadIdentifier tid, long sequenceId) {
- Long lastSequenceNumber = Long.valueOf(sequenceId);
+ Long lastSequenceNumber = sequenceId;
boolean wasEmpty = false;
Long oldvalue = null;
Map internalMap = null;
@@ -1734,16 +1683,10 @@ public class HARegionQueue implements RegionQueue {
* creates a DACE. Only one QRM operates at a time on a DACE & any other mesasge will be waiting
* for the current thread to exit. This is accomplished by taking a lock on QRM_LOCK object in the
* DACE.
- *
- * <p>
- * author Asif
- *
+ *
* @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence
* Id
- * @throws CacheException
- * @throws InterruptedException
*/
-
void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException {
ThreadIdentifier ti = getThreadIdentifier(lastDispatched);
long sequenceID = lastDispatched.getSequenceID();
@@ -1764,11 +1707,9 @@ public class HARegionQueue implements RegionQueue {
} else {
// Add the recently added ThreadIdentifier to the RegionQueue for
// expiry
- this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+ this.region.put(ti, dace.lastDispatchedSequenceId);
// update the stats
- // if (logger.isDebugEnabled()) {
this.stats.incThreadIdentifiers();
- // }
}
}
}
@@ -1778,7 +1719,6 @@ public class HARegionQueue implements RegionQueue {
*
* @return the size of the queue
*/
-
public int size() {
acquireReadLock();
try {
@@ -1788,12 +1728,8 @@ public class HARegionQueue implements RegionQueue {
}
}
- void decrementTakeSidePutPermits() {
-
- }
-
void incrementTakeSidePutPermits() {
-
+ // nothing
}
// called from dace on a put.
@@ -1929,13 +1865,8 @@ public class HARegionQueue implements RegionQueue {
/**
* Always returns false for a HARegionQueue class. Suitably overridden in BlockingHARegionQueue
* class.
- *
- * <p>
- * author Asif
- *
+ *
* @return false for HAREgionQueue as this is a non blocking class
- * @throws InterruptedException
- *
*/
boolean waitForData() throws InterruptedException {
return false;
@@ -1976,12 +1907,8 @@ public class HARegionQueue implements RegionQueue {
* @param cache Gemfire Cache instance
* @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws CacheException
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
final int haRgnQType, final boolean isDurable)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
Map container = null;
@@ -1993,7 +1920,7 @@ public class HARegionQueue implements RegionQueue {
container = new HashMap();
}
- return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache,
+ return getHARegionQueueInstance(regionName, cache,
HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haRgnQType, isDurable, container, null,
HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
}
@@ -2009,12 +1936,8 @@ public class HARegionQueue implements RegionQueue {
* @param isPrimary whether this is the primary queue for the client
* @param canHandleDelta boolean indicating whether the HARegionQueue can handle delta or not
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, GemFireCacheImpl cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable, Map haContainer,
ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary,
boolean canHandleDelta)
@@ -2038,12 +1961,11 @@ public class HARegionQueue implements RegionQueue {
default:
throw new IllegalArgumentException(
LocalizedStrings.HARegionQueue_HARGNQTYPE_CAN_EITHER_BE_BLOCKING_0_OR_NON_BLOCKING_1
- .toLocalizedString(new Object[] {Integer.valueOf(BLOCKING_HA_QUEUE),
- Integer.valueOf(NON_BLOCKING_HA_QUEUE)}));
+ .toLocalizedString(new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE}));
}
if (!isDurable) {
Integer expiryTime = Integer.getInteger(REGION_ENTRY_EXPIRY_TIME, hrqa.getExpiryTime());
- hrqa.setExpiryTime(expiryTime.intValue());
+ hrqa.setExpiryTime(expiryTime);
ExpirationAttributes ea =
new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE);
hrq.region.getAttributesMutator().setEntryTimeToLive(ea);
@@ -2054,19 +1976,10 @@ public class HARegionQueue implements RegionQueue {
/**
* Creates a HARegionQueue object with default attributes. used by tests
*
- * @param regionName
- * @param cache
- * @param hrqa
- * @param haRgnQType
- * @param isDurable
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
Map container = null;
@@ -2078,8 +1991,8 @@ public class HARegionQueue implements RegionQueue {
container = new HashMap();
}
- return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache, hrqa, haRgnQType,
- isDurable, container, null, HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
+ return getHARegionQueueInstance(regionName, cache, hrqa, haRgnQType, isDurable, container, null,
+ HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
}
public boolean isEmptyAckList() {
@@ -2122,7 +2035,7 @@ public class HARegionQueue implements RegionQueue {
if (this.destroyFromAvailableIDsAndRegion(counter)) {
stats.incEventsRemoved();
}
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -2134,18 +2047,14 @@ public class HARegionQueue implements RegionQueue {
}
}
-
-
/**
- * This is an implemention of RegionQueue where peek() & take () are blocking operation and will
+ * This is an implementation of RegionQueue where peek() & take () are blocking operation and will
* not return unless it gets some legitimate value The Lock object used by this class is a
* ReentrantLock & not a ReadWriteLock as in the base class. This reduces the concurrency of peek
* operations, but it enables the condition object of the ReentrantLock used to guard the
* idsAvailable Set for notifying blocking peek & take operations. Previously a separate Lock
* object was used by the BlockingQueue for wait notify. This class will be performant if there is
* a single peek thread.
- *
- *
*/
private static class BlockingHARegionQueue extends HARegionQueue {
/**
@@ -2180,19 +2089,11 @@ public class HARegionQueue implements RegionQueue {
protected final StoppableCondition blockCond;
/**
- *
- * @param regionName
- * @param cache
* @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can
* be set
- * @param haContainer
* @param isPrimary whether this is the primary queue for a client
- * @throws IOException TODO-javadocs
- * @throws ClassNotFoundException TODO-javadocs
- * @throws CacheException TODO-javadocs
- * @throws InterruptedException
*/
- protected BlockingHARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected BlockingHARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2231,12 +2132,9 @@ public class HARegionQueue implements RegionQueue {
* This effectively makes the blocking queue behave like a non-blocking queue which throttles
* puts if it reaches its capacity. This was changed in 8.1, see #51400. This function is NOOP
* in the HARegionQueue.
- *
- * <p>
- * author Asif
*/
@Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
void checkQueueSizeConstraint() throws InterruptedException {
if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
if (Thread.interrupted())
@@ -2261,8 +2159,8 @@ public class HARegionQueue implements RegionQueue {
this.maxQueueSizeHitCount = 0;
}
++this.maxQueueSizeHitCount;
- // for (;;) {
this.region.checkReadiness(); // fix for bug 37581
+ // TODO: wait called while holding two locks
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
this.region.checkReadiness(); // fix for bug 37581
// Fix for #51400. Allow the queue to grow beyond its
@@ -2270,15 +2168,13 @@ public class HARegionQueue implements RegionQueue {
// drain the queue, either due to a slower client or the
// deadlock scenario mentioned in the ticket.
reconcilePutPermits();
- // }
if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
logger.info(LocalizedMessage
.create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
}
} catch (InterruptedException ex) {
- // TODO:Asif: The line below is meaningless. Comment it out
- // later
- this.permitMon.notify();
+ // TODO: The line below is meaningless. Comment it out later
+ this.permitMon.notifyAll();
throw ex;
}
}
@@ -2292,14 +2188,10 @@ public class HARegionQueue implements RegionQueue {
/**
* This function should always be called under a lock on putGuard & permitMon obejct
- *
- * <p>
- * author Asif
- *
- * @return int currnet Put permits
+ *
+ * @return int current Put permits
*/
private int reconcilePutPermits() {
-
putPermits += takeSidePutPermits;
takeSidePutPermits = 0;
return putPermits;
@@ -2312,10 +2204,6 @@ public class HARegionQueue implements RegionQueue {
* added in case a put operation which has reduced the put permit optmistically but due to some
* reason ( most likely because of duplicate event) was not added in the queue. In such case it
* will increment take side permit without notifying any waiting thread
- *
- * <p>
- * author Asif
- *
*/
@Override
void incrementTakeSidePutPermitsWithoutNotify() {
@@ -2328,24 +2216,19 @@ public class HARegionQueue implements RegionQueue {
* Implemented to reduce contention between concurrent take/remove operations and put . The
* reconciliation between take side put permits & put side put permits happens only if theput
* side put permits are exhausted. In HARehionQueue base class this is a NOOP function
- *
- * <p>
- * author Asif
- *
*/
@Override
void incrementTakeSidePutPermits() {
if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
synchronized (this.permitMon) {
++this.takeSidePutPermits;
- this.permitMon.notify();
+ this.permitMon.notifyAll();
}
}
}
/**
* Identical to the acquireReadLock as there is only one type of Lock object in this class.
- *
*/
@Override
void acquireWriteLock() {
@@ -2378,8 +2261,6 @@ public class HARegionQueue implements RegionQueue {
* acquiring the lock on ReentrantLock object. It blocks the thread if the queue is empty or
* returns true otherwise . This will always return true indicating that data is available for
* retrieval or throw an Exception.It can never return false.
- *
- * @throws InterruptedException
*/
@Override
boolean waitForData() throws InterruptedException {
@@ -2443,7 +2324,7 @@ public class HARegionQueue implements RegionQueue {
LinkedList unremovedElements = null;
HashMap currDurableMap = null;
- protected DurableHARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected DurableHARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2604,11 +2485,11 @@ public class HARegionQueue implements RegionQueue {
this.releaseWriteLock();
}
}
- /**
- * ashetkar: Setting this threadlocal variable to null has no use as the current thread never
- * uses it. Instead it should really be set null by message dispatcher thread while starting
- * or resuming. This was added in revision 20914. Need to check if it really needs to be
- * thread local.
+ /*
+ * Setting this threadlocal variable to null has no use as the current thread never uses it.
+ * Instead it should really be set null by message dispatcher thread while starting or
+ * resuming. This was added in revision 20914. Need to check if it really needs to be thread
+ * local.
*/
peekedEventsContext.set(null);
this.threadIdToSeqId.list.clear();
@@ -2675,38 +2556,27 @@ public class HARegionQueue implements RegionQueue {
* bridge between the user defined HARegionQueue class & the actual class. This class object will
* be buggy as it will tend to publish the Object o QRM thread & the expiry thread before the
* complete creation of the HARegionQueue instance
- *
- * <p>
- * author Asif
- *
*/
static class TestOnlyHARegionQueue extends HARegionQueue {
/**
* Overloaded constructor to accept haContainer.
*
- * @param regionName
- * @param cache
- * @param haContainer
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- TestOnlyHARegionQueue(String regionName, Cache cache, Map haContainer)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, Map haContainer)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
- haContainer, HandShake.CONFLATION_DEFAULT, false);
+ this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haContainer,
+ HandShake.CONFLATION_DEFAULT, false);
this.initialized.set(true);
}
- TestOnlyHARegionQueue(String regionName, Cache cache)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
- new HashMap(), HandShake.CONFLATION_DEFAULT, false);
+ this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, new HashMap(),
+ HandShake.CONFLATION_DEFAULT, false);
}
- TestOnlyHARegionQueue(String regionName, GemFireCacheImpl cache, HARegionQueueAttributes hrqa,
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa,
Map haContainer, final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
super(regionName, cache, hrqa, haContainer, null, clientConflation, isPrimary);
@@ -2718,34 +2588,21 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Overloaded constructor to pass an <code>HashMap</code> instance as a haContainer.
+ * Overloaded constructor to pass an {@code HashMap} instance as a haContainer.
*
- * @param regionName
- * @param cache
- * @param hrqa
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- TestOnlyHARegionQueue(String regionName, Cache cache, HARegionQueueAttributes hrqa)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT,
- false);
+ this(regionName, cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT, false);
}
}
/**
* This thread will check for messages which have been dispatched. After a configurable time or
- * size is reached, it will create a new <code>QueueRemovalMessage</code> and send it to all the
- * nodes in the DistributedSystem
- *
- * <p>
- * author Mitul Bid
- *
+ * size is reached, it will create a new {@code QueueRemovalMessage} and send it to all the nodes
+ * in the DistributedSystem
*/
-
private static class QueueRemovalThread extends Thread {
/**
@@ -2753,14 +2610,14 @@ public class HARegionQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
/**
* Constructor : Creates and initializes the thread
*/
- public QueueRemovalThread(GemFireCacheImpl c) {
+ public QueueRemovalThread(InternalCache cache) {
this.setDaemon(true);
- this.cache = c;
+ this.cache = cache;
}
private boolean checkCancelled() {
@@ -2775,11 +2632,7 @@ public class HARegionQueue implements RegionQueue {
/**
* The thread will check the dispatchedMessages map for messages that have been dispatched. It
- * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
- */
- /**
- * The thread will check the dispatchedMessages map for messages that have been dispatched. It
- * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
+ * will create a new {@code QueueRemovalMessage} and send it to the other nodes
*/
@Override
public void run() {
@@ -2836,7 +2689,7 @@ public class HARegionQueue implements RegionQueue {
dm.putOutgoing(qrm);
} // messages exist
} // be somewhat tolerant of failures
- catch (CancelException e) {
+ catch (CancelException ignore) {
if (logger.isDebugEnabled()) {
logger.debug("QueueRemovalThread is exiting due to cancellation");
}
@@ -2918,14 +2771,14 @@ public class HARegionQueue implements RegionQueue {
* threadIdToSequenceIdMap.list.add(internalMap); } }
*/
// first add the size within the lock
- queueRemovalMessageList.add(Integer.valueOf(internalMap.size()));
+ queueRemovalMessageList.add(internalMap.size());
internalIterator = internalMap.entrySet().iterator();
// then add the event ids to the message list within the lock
while (internalIterator.hasNext()) {
internalEntry = (Map.Entry) internalIterator.next();
tid = (ThreadIdentifier) internalEntry.getKey();
sequenceId = (Long) internalEntry.getValue();
- eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId.longValue());
+ eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId);
queueRemovalMessageList.add(eventId);
}
}
@@ -2945,7 +2798,7 @@ public class HARegionQueue implements RegionQueue {
boolean interrupted = Thread.interrupted();
try {
this.join(15 * 1000);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -2960,13 +2813,9 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Class whick keeps track of the positions ( keys) of underlying Region object for the events
+ * Class which keeps track of the positions ( keys) of underlying Region object for the events
* placed in the Queue. It also keeps track of the last sequence ID dispatched. Thus all the
* events with sequence ID less than that dispatched are eligible for removal
- *
- * <p>
- * author Asif
- *
*/
public static class DispatchedAndCurrentEvents implements DataSerializableFixedID, Serializable {
/**
@@ -3006,10 +2855,6 @@ public class HARegionQueue implements RegionQueue {
/**
* Used for debugging purpose to ensure that in no situation , for a given ThreadIdentifier the
* order gets violated
- *
- * <p>
- * author Asif
- *
*/
protected volatile long lastSequenceIDPut = INIT_OF_SEQUENCEID;
@@ -3024,15 +2869,9 @@ public class HARegionQueue implements RegionQueue {
* @param event Object to be added to the queue
* @param sequenceID Sequence ID of the event originating from a unqiue thread identified by its
* ThreadIdentifier
- * @throws CacheException
- * @throws InterruptedException
*/
protected boolean putObject(Conflatable event, long sequenceID)
throws CacheException, InterruptedException {
- // logger.debug("BRUCE: putObject() lastSequenceIDPut="+lastSequenceIDPut
- // +"; adding sequenceID="+sequenceID + " for " + event);
- // logger.info("putObject, sequenceID = " + sequenceID + "; lastSequenceIDPut = " +
- // lastSequenceIDPut, new Exception("putObject"));
Long oldPosition = null;
final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER);
if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID
@@ -3072,7 +2911,7 @@ public class HARegionQueue implements RegionQueue {
}
if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) {
// Insert the object into the Region
- Long position = Long.valueOf(owningQueue.tailKey.incrementAndGet());
+ Long position = owningQueue.tailKey.incrementAndGet();
owningQueue.putEventInHARegion(event, position);
@@ -3128,15 +2967,11 @@ public class HARegionQueue implements RegionQueue {
/**
* Destroys the the old entry ( which got replaced by the new entry due to conflation) from the
- * availableIDs , Region & Counters set. Since this is executed within a synch block by the new
+ * availableIDs , Region & Counters set. Since this is executed within a sync block by the new
* entry thread, it is guaranteed that the old entry thread will exit first , placing the
- * poistion etc in the available IDs set. Also the new entry thraed & old entry thread are
- * belonging to diffrenet ThreadIdentifier objects & hence hold different
+ * position etc in the available IDs set. Also the new entry thread & old entry thread are
+ * belonging to different ThreadIdentifier objects & hence hold different
* DispatchedAndCurrentEvents object.
- *
- * @param oldPosition
- * @throws CacheException
- * @throws InterruptedException
*/
private void removeOldConflatedEntry(Long oldPosition)
throws CacheException, InterruptedException {
@@ -3152,9 +2987,8 @@ public class HARegionQueue implements RegionQueue {
}
// </HA overflow>
// update statistics
- // if (logger.isDebugEnabled()) {
- // vrao: Fix for bug 39291:
+ // Fix for bug 39291:
// Since markers are always conflated regardless of the conflation
// setting and they are not normal (are internal) events, we should
// not bump the events-conflated stat for markers.
@@ -3163,7 +2997,6 @@ public class HARegionQueue implements RegionQueue {
} else {
owningQueue.stats.incMarkerEventsConflated();
}
- // }
}
}
}
@@ -3184,13 +3017,10 @@ public class HARegionQueue implements RegionQueue {
ConcurrentMap conflationMap = (ConcurrentMap) owningQueue.indexes.get(rName);
Assert.assertTrue(conflationMap != null);
conflationMap.remove(key, position);
-
}
/**
* Removes the Entry from the Counters Set contained in DACE
- *
- * @param position
*/
protected synchronized void destroy(Long position) {
if (this.counters != null) {
@@ -3227,17 +3057,17 @@ public class HARegionQueue implements RegionQueue {
owningQueue.eventsMap.remove(ti);
expired = true;
this.owningQueue.getStatistics().decThreadIdentifiers();
- } catch (RegionDestroyedException ignore) {
+ } catch (RegionDestroyedException e) {
if (!owningQueue.destroyInProgress && logger.isDebugEnabled()) {
logger.debug(
"DispatchedAndCurrentEvents::expireOrUpdate: Queue found destroyed while removing expiry entry for ThreadIdentifier={} and expiry value={}",
- ti, expVal, ignore);
+ ti, expVal, e);
}
} catch (EntryNotFoundException enfe) {
if (!owningQueue.destroyInProgress) {
logger.error(LocalizedMessage.create(
LocalizedStrings.HARegionQueue_DISPATCHEDANDCURRENTEVENTSEXPIREORUPDATE_UNEXPECTEDLY_ENCOUNTERED_EXCEPTION_WHILE_REMOVING_EXPIRY_ENTRY_FOR_THREADIDENTIFIER_0_AND_EXPIRY_VALUE_1,
- new Object[] {ti, Long.valueOf(expVal), enfe}));
+ new Object[] {ti, expVal, enfe}));
}
}
}
@@ -3245,7 +3075,7 @@ public class HARegionQueue implements RegionQueue {
if (!expired) {
try {
// Update the entry with latest sequence ID
- owningQueue.region.put(ti, Long.valueOf(this.lastDispatchedSequenceId));
+ owningQueue.region.put(ti, this.lastDispatchedSequenceId);
} catch (CancelException e) {
throw e;
} catch (Exception e) {
@@ -3267,8 +3097,6 @@ public class HARegionQueue implements RegionQueue {
* be removed
*
* @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
- * @throws CacheException
- * @throws InterruptedException
*/
protected void setLastDispatchedIDAndRemoveEvents(long lastDispatchedSeqId)
throws CacheException, InterruptedException {
@@ -3341,8 +3169,6 @@ public class HARegionQueue implements RegionQueue {
* Events which have been peeked & are now candidate for removal. It has to be guaranteed
* that the sequence IDs of all the other counters is less than the last dispatched
* @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
- * @throws CacheException
- * @throws InterruptedException
*/
protected void setLastDispatchedIDAndRemoveEvents(List removedEventInfoList,
long lastDispatchedSeqId) throws CacheException, InterruptedException {
@@ -3472,8 +3298,6 @@ public class HARegionQueue implements RegionQueue {
/**
* destroys the underlying HARegion and removes its reference from the dispatched messages map
- *
- * @throws CacheWriterException
*/
public void destroy() throws CacheWriterException {
this.destroyInProgress = true;
@@ -3484,9 +3308,9 @@ public class HARegionQueue implements RegionQueue {
try {
try {
updateHAContainer();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// keep going
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// keep going
if (logger.isDebugEnabled()) {
logger.debug("HARegionQueue#destroy: ignored cancellation!!!!");
@@ -3495,9 +3319,9 @@ public class HARegionQueue implements RegionQueue {
try {
this.region.destroyRegion();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// keep going
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// keep going
}
((HAContainerWrapper) haContainer).removeProxy(regionName);
@@ -3510,8 +3334,6 @@ public class HARegionQueue implements RegionQueue {
* If the event is an instance of HAEventWrapper, put it into the haContainer and then into the ha
* region. Otherwise, simply put it into the ha region.
*
- * @param event
- * @param position
* @since GemFire 5.7
*/
protected void putEventInHARegion(Conflatable event, Long position) {
@@ -3623,7 +3445,7 @@ public class HARegionQueue implements RegionQueue {
* If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its
* clientUpdateMessage into the haContainer as <key, value>.
*
- * @param haEventWrapper An instance of <code>HAEventWrapper</code>
+ * @param haEventWrapper An instance of {@code HAEventWrapper}
* @since GemFire 5.7
*/
protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) {
@@ -3690,7 +3512,7 @@ public class HARegionQueue implements RegionQueue {
Object[] wrapperArray = null;
acquireReadLock();
try {
- if (!(this.availableIDsSize() == 0)) {
+ if (this.availableIDsSize() != 0) {
wrapperArray = this.availableIDsArray();
}
} finally {
@@ -3714,7 +3536,7 @@ public class HARegionQueue implements RegionQueue {
HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
}
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
return; // we're done
} catch (Exception e) {
if (logger.isDebugEnabled()) {
@@ -3751,10 +3573,9 @@ public class HARegionQueue implements RegionQueue {
* If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
* in the haContainer, then decrements its reference count. If the decremented ref count is zero
* and put is not in progress, removes the entry from the haContainer, before returning the
- * <code>ClientUpdateMessage</code> instance.
+ * {@code ClientUpdateMessage} instance.
*
- * @param conflatable
- * @return An instance of <code>ClientUpdateMessage</code>
+ * @return An instance of {@code ClientUpdateMessage}
* @since GemFire 5.7
*/
public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) {
@@ -3778,7 +3599,6 @@ public class HARegionQueue implements RegionQueue {
* Decrements wrapper's reference count by one. If the decremented ref count is zero and put is
* not in progress, removes the entry from the haContainer.
*
- * @param wrapper
* @since GemFire 5.7
*/
public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) {
@@ -3813,7 +3633,7 @@ public class HARegionQueue implements RegionQueue {
/**
* Set whether the dispatcher of this node is active or not (i.e. primary or secondary node). If
- * <code>flag</code> is set to <code>true</code>, disables Entry Expiry Tasks.
+ * {@code flag} is set to {@code true}, disables Entry Expiry Tasks.
*
* @param flag the value to set isPrimary to
*/
@@ -3830,7 +3650,7 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Disables EntryExpiryTask for the HARegion (<code>this.region</code>).
+ * Disables EntryExpiryTask for the HARegion ({@code this.region}).
*
*/
private void disableEntryExpiryTasks() {
@@ -3842,7 +3662,7 @@ public class HARegionQueue implements RegionQueue {
this.region.setCustomEntryTimeToLive(new ThreadIdentifierCustomExpiry());
logger.info(LocalizedMessage.create(
LocalizedStrings.HARegionQueue_ENYTRY_EXPIRY_TASKS_DISABLED_BECAUSE_QUEUE_BECAME_PRIMARY_OLD_MSG_TTL_0,
- new Object[] {Integer.valueOf(oldTimeToLive)}));
+ new Object[] {oldTimeToLive}));
}
}
@@ -3882,7 +3702,7 @@ public class HARegionQueue implements RegionQueue {
if (r != null && !r.isDestroyed()) {
try {
r.close();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
}
}
}
@@ -3895,97 +3715,83 @@ public class HARegionQueue implements RegionQueue {
*/
public boolean isPeekInitialized() {
return HARegionQueue.peekedEventsContext.get() != null;
-
}
-}
-
-
-/**
- * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is operating
- * on it. This wrapper acts as a means of communication between the QRM thread & the MapWrapper
- * object contained in the HARegionQueue
- *
- * <p>
- * author ashahid
- */
-
-class MapWrapper {
- Map map;
+ /**
+ * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is
+ * operating on it. This wrapper acts as a means of communication between the QRM thread & the
+ * MapWrapper object contained in the HARegionQueue
+ */
+ static class MapWrapper {
+ Map map;
- List list;
+ List list;
- boolean keepPrevAcks = false;
+ boolean keepPrevAcks = false;
- public MapWrapper() {
- super();
- map = new HashMap();
- list = new LinkedList();
- }
+ public MapWrapper() {
+ super();
+ map = new HashMap();
+ list = new LinkedList();
+ }
- void put(Object key, Object o) {
- synchronized (this.map) {
- this.map.put(key, o);
+ void put(Object key, Object o) {
+ synchronized (this.map) {
+ this.map.put(key, o);
+ }
}
}
-}
-
-/**
- * A wrapper class that has counter, key and the region-name for an event which was peeked and needs
- * to be removed. The key and regionName fields will be set only if conflation is true for the
- * event.
- *
- * <p>
- * author dpatel
- *
- */
-
-class RemovedEventInfo {
- Long counter;
+ /**
+ * A wrapper class that has counter, key and the region-name for an event which was peeked and
+ * needs to be removed. The key and regionName fields will be set only if conflation is true for
+ * the event.
+ */
+ static class RemovedEventInfo {
+ Long counter;
- String regionName;
+ String regionName;
- Object key;
+ Object key;
- public RemovedEventInfo(Long counter, String regionName, Object key) {
- this.counter = counter;
- this.regionName = regionName;
- this.key = key;
+ public RemovedEventInfo(Long counter, String regionName, Object key) {
+ this.counter = counter;
+ this.regionName = regionName;
+ this.key = key;
+ }
}
-}
+ /** this is used to expire thread identifiers, even in primary queues */
+ static class ThreadIdentifierCustomExpiry implements CustomExpiry {
+ private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
+ HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
+ private static volatile ExpirationAttributes testExpAtts = null;
-/** this is used to expire thread identifiers, even in primary queues */
-class ThreadIdentifierCustomExpiry implements CustomExpiry {
- private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
- HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
- private static volatile ExpirationAttributes testExpAtts = null;
-
- public ExpirationAttributes getExpiry(Region.Entry entry) {
- // Use key to determine expiration.
- Object key = entry.getKey();
- if (key instanceof ThreadIdentifier) {
- final int expTime = HARegionQueue.threadIdExpiryTime;
- if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
- // This should only happen in unit test code
- ExpirationAttributes result = testExpAtts;
- if (result == null || result.getTimeout() != expTime) {
- result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
- // save the expiration attributes in a static to prevent tests from creating lots of
- // instances.
- testExpAtts = result;
+ public ExpirationAttributes getExpiry(Region.Entry entry) {
+ // Use key to determine expiration.
+ Object key = entry.getKey();
+ if (key instanceof ThreadIdentifier) {
+ final int expTime = HARegionQueue.threadIdExpiryTime;
+ if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
+ // This should only happen in unit test code
+ ExpirationAttributes result = testExpAtts;
+ if (result == null || result.getTimeout() != expTime) {
+ result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
+ // save the expiration attributes in a static to prevent tests from creating lots of
+ // instances.
+ testExpAtts = result;
+ }
+ return result;
+ } else {
+ return DEFAULT_THREAD_ID_EXP_ATTS;
}
- return result;
} else {
- return DEFAULT_THREAD_ID_EXP_ATTS;
+ return null;
}
- } else {
- return null;
}
- }
- public void close() {}
+ public void close() {}
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index ca9cc20..7879538 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -33,6 +33,7 @@ import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -42,34 +43,17 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
* This message is sent to all the nodes in the DistributedSystem. It contains the list of messages
* that have been dispatched by this node. The messages are received by other nodes and the
* processing is handed over to an executor
- *
- *
*/
public final class QueueRemovalMessage extends PooledDistributionMessage {
private static final Logger logger = LogService.getLogger();
- // /**
- // * Executor for processing incoming messages
- // */
- // private static final Executor executor;
-
-
/**
* List of messages (String[] )
*/
private List messagesList;
- // /**
- // * create the executor in a static block
- // */
- // static {
- // //TODO:Mitul best implementation of executor for this task?
- // executor = Executors.newCachedThreadPool();
- // }
-
/**
* Constructor : Set the recipient list to ALL_RECIPIENTS
- *
*/
public QueueRemovalMessage() {
this.setRecipient(ALL_RECIPIENTS);
@@ -77,8 +61,6 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
/**
* Set the message list
- *
- * @param messages
*/
public void setMessagesList(List messages) {
this.messagesList = messages;
@@ -87,22 +69,19 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
/**
* Extracts the region from the message list and hands over the message removal task to the
* executor
- *
- * @param dm
*/
@Override
protected void process(DistributionManager dm) {
-
- final GemFireCacheImpl cache;
+ final InternalCache cache;
// use GemFireCache.getInstance to avoid blocking during cache.xml processing.
- cache = GemFireCacheImpl.getInstance(); // CacheFactory.getAnyInstance();
+ cache = GemFireCacheImpl.getInstance();
if (cache != null) {
Iterator iterator = this.messagesList.iterator();
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
try {
while (iterator.hasNext()) {
final String regionName = (String) iterator.next();
- final int size = ((Integer) iterator.next()).intValue();
+ final int size = (Integer) iterator.next();
final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
final HARegionQueue hrq;
if (region == null || !region.isInitialized()) {
@@ -134,21 +113,21 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
regionName, id);
}
hrq.removeDispatchedEvents(id);
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
logger.info(LocalizedMessage.create(
LocalizedStrings.QueueRemovalMessage_QUEUE_FOUND_DESTROYED_WHILE_PROCESSING_THE_LAST_DISPTACHED_SEQUENCE_ID_FOR_A_HAREGIONQUEUES_DACE_THE_EVENT_ID_IS_0_FOR_HAREGION_WITH_NAME_1,
new Object[] {id, regionName}));
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
return; // cache or DS is closing
} catch (CacheException e) {
logger.error(LocalizedMessage.create(
LocalizedStrings.QueueRemovalMessage_QUEUEREMOVALMESSAGEPROCESSEXCEPTION_IN_PROCESSING_THE_LAST_DISPTACHED_SEQUENCE_ID_FOR_A_HAREGIONQUEUES_DACE_THE_PROBLEM_IS_WITH_EVENT_ID__0_FOR_HAREGION_WITH_NAME_1,
new Object[] {regionName, id}), e);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
return; // interrupt occurs during shutdown. this runs in an executor, so just stop
// processing
}
- } catch (RejectedExecutionException e) {
+ } catch (RejectedExecutionException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -165,14 +144,13 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
@Override
public void toData(DataOutput out) throws IOException {
- /**
+ /*
* first write the total list size then in a loop write the region name, number of eventIds and
* the event ids
- *
*/
super.toData(out);
// write the size of the data list
- DataSerializer.writeInteger(Integer.valueOf(this.messagesList.size()), out);
+ DataSerializer.writeInteger(this.messagesList.size(), out);
Iterator iterator = messagesList.iterator();
String regionName = null;
Integer numberOfIds = null;
@@ -185,7 +163,7 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
numberOfIds = (Integer) iterator.next();
// write the number of event ids
DataSerializer.writeInteger(numberOfIds, out);
- maxVal = numberOfIds.intValue();
+ maxVal = numberOfIds;
// write the event ids
for (int i = 0; i < maxVal; i++) {
eventId = iterator.next();
@@ -200,14 +178,13 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- /**
+ /*
* read the total list size, reconstruct the message list in a loop by reading the region name,
* number of eventIds and the event ids
- *
*/
super.fromData(in);
// read the size of the message
- int size = DataSerializer.readInteger(in).intValue();
+ int size = DataSerializer.readInteger(in);
this.messagesList = new LinkedList();
int eventIdSizeInt;
for (int i = 0; i < size; i++) {
@@ -216,7 +193,7 @@ public final class QueueRemovalMessage extends PooledDistributionMessage {
// read the datasize
Integer eventIdSize = DataSerializer.readInteger(in);
this.messagesList.add(eventIdSize);
- eventIdSizeInt = eventIdSize.intValue();
+ eventIdSizeInt = eventIdSize;
// read the total number of events
for (int j = 0; j < eventIdSizeInt; j++) {
this.messagesList.add(DataSerializer.readObject(in));
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
index df6e2f2..eeb3704 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BecomePrimaryBucketMessage.java
@@ -134,7 +134,7 @@ public class BecomePrimaryBucketMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId);
buff.append("; isRebalance=").append(this.isRebalance);
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
index 04349aa..ed17740 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/BucketSizeMessage.java
@@ -114,7 +114,7 @@ public final class BucketSizeMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
index 3cca861..d6422c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ContainsKeyValueMessage.java
@@ -143,7 +143,7 @@ public final class ContainsKeyValueMessage extends PartitionMessageWithDirectRep
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; valueCheck=").append(this.valueCheck).append("; key=").append(this.key)
.append("; bucketId=").append(this.bucketId);
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
index 03b5ded..744e013 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
@@ -172,9 +172,10 @@ public final class CreateBucketMessage extends PartitionMessage {
* Assists the toString method in reporting the contents of this message
*
* @see PartitionMessage#toString()
+ * @param buff
*/
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId).append("; bucketSize=")
.append(this.bucketSize);
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
index e765df0..f305fa8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DeposePrimaryBucketMessage.java
@@ -113,7 +113,7 @@ public class DeposePrimaryBucketMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index dc55835..bffaf4d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -419,9 +419,10 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
* Assists the toString method in reporting the contents of this message
*
* @see PartitionMessage#toString()
+ * @param buff
*/
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; key=").append(getKey());
if (originalSender != null) {
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
index e9468dc..6314f2d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DumpB2NRegion.java
@@ -315,7 +315,7 @@ public final class DumpB2NRegion extends PartitionMessage {
* StringBuffer)
*/
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append(" bucketId=").append(this.bucketId).append(" primaryInfoOnly=")
.append(this.onlyReturnPrimaryInfo);
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
index 4a09f94..0502d5f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/EndBucketCreationMessage.java
@@ -107,7 +107,7 @@ public class EndBucketCreationMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId);
buff.append("; newPrimary=").append(this.newPrimary);
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
index 7208baf..41186ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -47,7 +47,6 @@ import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.BucketDump;
import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.PartitionedRegion;
@@ -139,7 +138,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketIds);
buff.append("; recipient=").append(this.getRecipient());
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
index b0f052a..c7ca279 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntriesMessage.java
@@ -128,7 +128,7 @@ public final class FetchEntriesMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketId);
buff.append("; recipient=").append(this.getRecipient());
http://git-wip-us.apache.org/repos/asf/geode/blob/dc820690/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
index ae2ce37..301e154 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/FetchEntryMessage.java
@@ -174,7 +174,7 @@ public final class FetchEntryMessage extends PartitionMessage {
}
@Override
- protected void appendFields(StringBuffer buff) {
+ protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; key=").append(this.key);
}