You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/04 00:07:20 UTC
[06/54] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index 74034e4..ab6794b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -36,11 +36,10 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
-/**
- */
public class MemberFunctionExecutor extends AbstractExecution {
protected InternalDistributedSystem ds;
@@ -131,7 +130,7 @@ public class MemberFunctionExecutor extends AbstractExecution {
final FunctionContext context = new FunctionContextImpl(function.getId(),
getArgumentsForMember(localVM.getId()), resultSender);
boolean isTx = false;
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
isTx = cache.getTxManager().getTXState() == null ? false : true;
}
@@ -156,13 +155,9 @@ public class MemberFunctionExecutor extends AbstractExecution {
return localRC;
}
- /**
- * @param function
- * @param dest
- */
@Override
public void validateExecution(final Function function, final Set dest) {
- final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ final InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && cache.getTxManager().getTXState() != null) {
if (dest.size() > 1) {
throw new TransactionException(
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
index a3ae2c0..27542f5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
@@ -20,7 +20,6 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Region;
@@ -38,15 +37,12 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SetUtils;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
-/**
- *
- *
- */
public class MultiRegionFunctionExecutor extends AbstractExecution {
private final Set<Region> regions;
@@ -210,7 +206,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
LocalizedStrings.MemberFunctionExecutor_NO_MEMBER_FOUND_FOR_EXECUTING_FUNCTION_0
.toLocalizedString(function.getId()));
}
- final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ final InternalCache cache = GemFireCacheImpl.getInstance();
if (function.optimizeForWrite() && cache != null
&& cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
&& !MemoryThresholds.isLowMemoryExceptionDisabled()) {
@@ -218,7 +214,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
+ .toLocalizedString(function.getId(), sm),
sm);
}
setExecutionNodes(dest);
@@ -243,7 +239,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
Set<String> regionPathSet = memberToRegionMap.get(localVM);
Set<Region> regions = new HashSet<Region>();
if (regionPathSet != null) {
- Cache cache1 = GemFireCacheImpl.getInstance();
+ InternalCache cache1 = GemFireCacheImpl.getInstance();
for (String regionPath : regionPathSet) {
regions.add(cache1.getRegion(regionPath));
}
@@ -263,8 +259,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
MultiRegionFunctionResultWaiter waiter = new MultiRegionFunctionResultWaiter(ds,
localResultCollector, function, dest, memberArgs, resultSender, memberToRegionMap);
- ResultCollector reply = waiter.getFunctionResultFrom(dest, function, this);
- return reply;
+ return waiter.getFunctionResultFrom(dest, function, this);
}
return localResultCollector;
}
@@ -280,7 +275,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
PartitionedRegion pr = (PartitionedRegion) region;
Set<InternalDistributedMember> prMembers = pr.getRegionAdvisor().advisePrimaryOwners();
if (pr.isDataStore()) {
- GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache();
+ InternalCache cache = (InternalCache) region.getCache();
// Add local node
InternalDistributedMember localVm = cache.getMyId();
Set<String> regions = memberToRegions.get(localVm);
@@ -334,7 +329,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
memberToRegions.put(member, regions);
}
} else if (dp.withReplication()) {
- GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache();
+ InternalCache cache = (InternalCache) region.getCache();
// Add local node
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
@@ -345,7 +340,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
memberToRegions.put(local, regions);
}
} else if (region instanceof LocalRegion) {
- GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache();
+ InternalCache cache = (InternalCache) region.getCache();
// Add local node
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
@@ -366,9 +361,9 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
@Override
public void validateExecution(Function function, Set targetMembers) {
- GemFireCacheImpl cache = null;
+ InternalCache cache = null;
for (Region r : regions) {
- cache = (GemFireCacheImpl) r.getCache();
+ cache = (InternalCache) r.getCache();
break;
}
if (cache != null && cache.getTxManager().getTXState() != null) {
@@ -385,7 +380,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
} else if (!target.equals(funcTarget)) {
throw new TransactionDataNotColocatedException(
LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1
- .toLocalizedString(new Object[] {target, funcTarget}));
+ .toLocalizedString(target, funcTarget));
}
}
}
@@ -396,7 +391,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution {
Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
+ .toLocalizedString(function.getId(), sm),
sm);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
index c7a7d36..6e13ebc 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
@@ -12,11 +12,9 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.execute;
import java.util.Iterator;
-import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.geode.cache.LowMemoryException;
@@ -30,16 +28,12 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SetUtils;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.MemoryThresholds;
import org.apache.geode.internal.i18n.LocalizedStrings;
-/**
- *
- *
- */
public class PartitionedRegionFunctionExecutor extends AbstractExecution {
private final PartitionedRegion pr;
@@ -332,16 +326,9 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
return buf.toString();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode.
- * cache.execute.Function, java.util.Set)
- */
@Override
public void validateExecution(Function function, Set targetMembers) {
- GemFireCacheImpl cache = pr.getGemFireCache();
+ InternalCache cache = pr.getGemFireCache();
if (cache != null && cache.getTxManager().getTXState() != null) {
if (targetMembers.size() > 1) {
throw new TransactionException(
@@ -356,7 +343,7 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution {
} else if (!target.equals(funcTarget)) {
throw new TransactionDataRebalancedException(
LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1
- .toLocalizedString(new Object[] {target, funcTarget}));
+ .toLocalizedString(target, funcTarget));
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
index 3a20dc3..18ba32b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
@@ -12,9 +12,12 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.execute;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.ProxyCache;
@@ -26,28 +29,23 @@ import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
/**
- *
* Executes Function with FunctionService#onRegion(Region region) in client server mode.
*
* @see FunctionService#onRegion(Region) *
* @since GemFire 5.8 LA
- *
*/
public class ServerRegionFunctionExecutor extends AbstractExecution {
private static final Logger logger = LogService.getLogger();
- final private LocalRegion region;
+ private final LocalRegion region;
private boolean executeOnBucketSet = false;
public ServerRegionFunctionExecutor(Region r, ProxyCache proxyCache) {
@@ -288,11 +286,12 @@ public class ServerRegionFunctionExecutor extends AbstractExecution {
}
return srp;
} else {
- StringBuffer message = new StringBuffer();
+ StringBuilder message = new StringBuilder();
message.append(srp).append(": ");
- message.append(
- "No available connection was found. Server Region Proxy is not available for this region "
- + region.getName());
+ message
+ .append(
+ "No available connection was found. Server Region Proxy is not available for this region ")
+ .append(region.getName());
throw new FunctionException(message.toString());
}
}
@@ -340,16 +339,9 @@ public class ServerRegionFunctionExecutor extends AbstractExecution {
return new ServerRegionFunctionExecutor(this, argument);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode.
- * cache.execute.Function, java.util.Set)
- */
@Override
public void validateExecution(Function function, Set targetMembers) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && cache.getTxManager().getTXState() != null) {
TXStateProxyImpl tx = (TXStateProxyImpl) cache.getTxManager().getTXState();
tx.getRealDeal(null, region);
@@ -357,7 +349,6 @@ public class ServerRegionFunctionExecutor extends AbstractExecution {
}
}
-
@Override
public ResultCollector execute(final String functionName) {
if (functionName == null) {
@@ -472,6 +463,4 @@ public class ServerRegionFunctionExecutor extends AbstractExecution {
public boolean getExecuteOnBucketSetFlag() {
return this.executeOnBucketSet;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java
index 13d8e18..f78de18 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java
@@ -21,18 +21,17 @@ import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.InternalEntity;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.RestAgent;
/**
* The FindRestEnabledServersFunction class is a gemfire function that gives details about REST
* enabled gemfire servers.
- * <p/>
*
* @since GemFire 8.1
*/
-
public class FindRestEnabledServersFunction extends FunctionAdapter implements InternalEntity {
+ private static final long serialVersionUID = 7851518767859544678L;
/**
* This property defines internal function that will get executed on each node to fetch active
@@ -40,20 +39,17 @@ public class FindRestEnabledServersFunction extends FunctionAdapter implements I
*/
public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID =
FindRestEnabledServersFunction.class.getName();
- private static final long serialVersionUID = 7851518767859544678L;
-
public void execute(FunctionContext context) {
-
try {
- GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getAnyInstance();
+ InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
DistributionConfig config = InternalDistributedSystem.getAnyInstance().getConfig();
String bindAddress = RestAgent.getBindAddressForHttpService(config);
final String protocolType = config.getHttpServiceSSLEnabled() ? "https" : "http";
- if (c.isRESTServiceRunning()) {
+ if (cache.isRESTServiceRunning()) {
context.getResultSender()
.lastResult(protocolType + "://" + bindAddress + ":" + config.getHttpServicePort());
@@ -62,7 +58,6 @@ public class FindRestEnabledServersFunction extends FunctionAdapter implements I
}
} catch (CacheClosedException ex) {
context.getResultSender().lastResult("");
-
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 7ee5a8d..9c2ea23 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -14,8 +14,52 @@
*/
package org.apache.geode.internal.cache.ha;
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.MirrorType;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.query.internal.CqQueryVsdStats;
import org.apache.geode.cache.query.internal.cq.CqService;
@@ -30,9 +74,20 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.internal.cache.tier.sockets.*;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.cache.tier.sockets.HandShake;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -41,17 +96,6 @@ import org.apache.geode.internal.util.concurrent.StoppableCondition;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.*;
/**
* An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be
@@ -72,26 +116,23 @@ import java.util.concurrent.locks.*;
*
* 30 May 2008: 5.7 onwards the underlying GemFire Region will continue to have key as counter(long)
* but the value will be a wrapper object(HAEventWrapper) which will be a key in a separate data
- * strucure called haContainer (an implementation of Map). The value against this wrapper will be
+ * structure called haContainer (an implementation of Map). The value against this wrapper will be
* the offered object in the queue. The purpose of this modification is to allow multiple
* ha-region-queues share their offered values without storing separate copies in memory, upon GII.
*
* (See BlockingHARegionQueue)
*
- *
* @since GemFire 4.3
- *
*/
public class HARegionQueue implements RegionQueue {
private static final Logger logger = LogService.getLogger();
- /** The <code>Region</code> backing this queue */
+ /** The {@code Region} backing this queue */
protected HARegion region;
/**
- * The key into the <code>Region</code> used when putting entries onto the queue. The counter uses
+ * The key into the {@code Region} used when putting entries onto the queue. The counter uses
* incrementAndGet so counter will always be started from 1
- *
*/
protected final AtomicLong tailKey = new AtomicLong(0);
@@ -100,26 +141,21 @@ public class HARegionQueue implements RegionQueue {
* object. Every add operation will be identified by the ThreadIdentifier object & the position
* recorded in the LastDispatchedAndCurrentEvents object.
*/
-
protected final ConcurrentMap eventsMap = new ConcurrentHashMap();
/**
- * The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
- * updating of entries in the queue for conflation.
+ * The {@code Map} mapping the regionName->key to the queue key. This index allows fast updating
+ * of entries in the queue for conflation.
*/
protected volatile Map indexes = Collections.unmodifiableMap(new HashMap());
- // TODO:Asif: Should we worry about whether to some how make it writer
- // preference?
- /** Lock object for updating the queue size by different operations */
- // private final Object SIZE_LOCK = new Object();
private final StoppableReentrantReadWriteLock rwLock;
private final StoppableReentrantReadWriteLock.StoppableReadLock readLock;
private final StoppableWriteLock writeLock;
- /** The name of the <code>Region</code> backing this queue */
+ /** The name of the {@code Region} backing this queue */
private final String regionName;
/** The ClientProxyMembershipID associated with the ha queue */
@@ -151,10 +187,7 @@ public class HARegionQueue implements RegionQueue {
* A sequence violation can occur , if an HARegionQueue receives events thru GII & the same event
* also arrives via Gemfire Put in that local VM. If the HARegionQueue does not receive any data
* via GII , then there should not be any violation. If there is data arriving thru GII, such
- * voiolations can be expected , but should be analyzed thoroughly.
- *
- * <p>
- * author Asif
+ * violations can be expected , but should be analyzed thoroughly.
*/
protected boolean puttingGIIDataInQueue;
@@ -166,14 +199,12 @@ public class HARegionQueue implements RegionQueue {
/**
* a thread local to store the counters corresponding to the events peeked by a particular thread.
- * When <code>remove()</code> will be called, these events stored in thread-local will be
- * destroyed.
+ * When {@code remove()} will be called, these events stored in thread-local will be destroyed.
*/
protected static final ThreadLocal peekedEventsContext = new ThreadLocal();
/**
- * Thread which creates the <code>QueueRemovalMessage</code> and sends it to other nodes in the
- * system
+ * Thread which creates the {@code QueueRemovalMessage} and sends it to other nodes in the system
*/
private static QueueRemovalThread qrmThread;
@@ -284,30 +315,18 @@ public class HARegionQueue implements RegionQueue {
protected long maxQueueSizeHitCount = 0;
/**
- *
* Processes the given string and returns a string which is allowed for region names
*
- * @param regionName
* @return legal region name
*/
public static String createRegionName(String regionName) {
- String result = regionName.replace('/', '#'); // [yogi]: region name cannot
- // contain the separator '/'
- return result;
+ return regionName.replace('/', '#');
}
/**
- *
- * @param regionName
- * @param cache
* @param isPrimary whether this is the primary queue for a client
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
*/
-
- protected HARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected HARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes haAttributes, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -332,13 +351,13 @@ public class HARegionQueue implements RegionQueue {
this.readLock = this.rwLock.readLock();
this.writeLock = this.rwLock.writeLock();
- this.putGIIDataInRegion();
+ putGIIDataInRegion();
if (this.getClass() == HARegionQueue.class) {
initialized.set(true);
}
}
- private void createHARegion(String processedRegionName, GemFireCacheImpl cache)
+ private void createHARegion(String processedRegionName, InternalCache cache)
throws IOException, ClassNotFoundException {
AttributesFactory af = new AttributesFactory();
af.setMirrorType(MirrorType.KEYS_VALUES);
@@ -358,7 +377,7 @@ public class HARegionQueue implements RegionQueue {
* reinitialize the queue, presumably pulling current information from seconaries
*/
public void reinitializeRegion() {
- GemFireCacheImpl cache = this.region.getCache();
+ InternalCache cache = this.region.getCache();
String regionName = this.region.getName();
this.region.destroyRegion();
Exception problem = null;
@@ -412,7 +431,7 @@ public class HARegionQueue implements RegionQueue {
// use putIfAbsent to avoid overwriting newer dispatch information
Object o = this.eventsMap.putIfAbsent(entry.getKey(), giiDace);
if (o != null && isDebugEnabled_BS) {
- sb.append(" -- could not store. found " + o);
+ sb.append(" -- could not store. found ").append(o);
}
}
}
@@ -425,11 +444,8 @@ public class HARegionQueue implements RegionQueue {
* Repopulates the HARegion after the GII is over so as to reset the counters and populate the
* DACE objects for the thread identifiers . This method should be invoked as the last method in
* the constructor . Thus while creating BlockingQueue this method should be invoked lastly in the
- * derived class contructor , after the HARegionQueue contructor is complete. Otherwise, the
+ * derived class constructor , after the HARegionQueue contructor is complete. Otherwise, the
* ReentrantLock will be null.
- *
- * @throws CacheException
- * @throws InterruptedException
*/
void putGIIDataInRegion() throws CacheException, InterruptedException {
Set entrySet = this.region.entries(false);
@@ -498,8 +514,6 @@ public class HARegionQueue implements RegionQueue {
* Puts the GII'd entry into the ha region, if it was GII'd along with its ClientUpdateMessageImpl
* instance.
*
- * @param val
- * @throws InterruptedException
* @since GemFire 5.7
*/
protected void putInQueue(Object val) throws InterruptedException {
@@ -507,7 +521,7 @@ public class HARegionQueue implements RegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(
"HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.",
- ((HAEventWrapper) val).getKeyToConflate());
+ ((Conflatable) val).getKeyToConflate());
}
} else {
this.put(val);
@@ -567,9 +581,6 @@ public class HARegionQueue implements RegionQueue {
* object & SIZE Lock
*
* @param object object to put onto the queue
- * @throws InterruptedException
- * @throws CacheException
- * @return boolean
*/
public boolean put(Object object) throws CacheException, InterruptedException {
this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
@@ -663,11 +674,9 @@ public class HARegionQueue implements RegionQueue {
dace = oldDace;
} else {
// Add the recently added ThreadIdentifier to the RegionQueue for expiry
- this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+ this.region.put(ti, dace.lastDispatchedSequenceId);
// update the stats
- // if (logger.isDebugEnabled()) {
this.stats.incThreadIdentifiers();
- // }
}
if (!dace.putObject(event, sequenceID)) {
this.put(object);
@@ -677,11 +686,6 @@ public class HARegionQueue implements RegionQueue {
}
}
}
- // update the stats
- // if (logger.isDebugEnabled()) {
- // this.stats.incEventsEnqued();
- // }
-
}
/**
@@ -691,7 +695,7 @@ public class HARegionQueue implements RegionQueue {
*/
public void startGiiQueueing() {
this.giiLock.writeLock().lock();
- this.giiCount++;
+ this.giiCount++; // TODO: non-atomic operation on volatile!
if (logger.isDebugEnabled()) {
logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount);
}
@@ -710,7 +714,7 @@ public class HARegionQueue implements RegionQueue {
this.giiLock.writeLock().lock();
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
- this.giiCount--;
+ this.giiCount--; // TODO: non-atomic operation on volatile!
if (isDebugEnabled) {
logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount);
}
@@ -731,15 +735,7 @@ public class HARegionQueue implements RegionQueue {
Object value;
try {
value = this.giiQueue.remove();
- } catch (NoSuchElementException e) {
- // if (actualCount != expectedCount) {
- // logger.severe(LocalizedStrings.DEBUG, "expected to drain "
- // + expectedCount + " messages but drained " + actualCount
- // + " queue.size() is now " + giiQueue.size() + ", in queue" + this, e);
- // } else {
- // logger.severe(LocalizedStrings.DEBUG, "drained " + actualCount + " messages. Queue
- // size is " + giiQueue.size() + " in " + this);
- // }
+ } catch (NoSuchElementException ignore) {
break;
}
actualCount++;
@@ -765,13 +761,11 @@ public class HARegionQueue implements RegionQueue {
if (value instanceof HAEventWrapper) {
decAndRemoveFromHAContainer((HAEventWrapper) value);
}
- } catch (NoSuchElementException e) {
+ } catch (NoSuchElementException ignore) {
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
// complete draining while holding the write-lock so nothing else
// can get into the queue
- // logger.severe(LocalizedStrings.DEBUG, "endGiiQueueing interrupted - ignoring until
- // draining completes");
interrupted = true;
}
}
@@ -804,19 +798,19 @@ public class HARegionQueue implements RegionQueue {
*/
public Map getEventMapForGII() {
// fix for bug #41621 - concurrent modification exception while serializing event map
- Map<ThreadIdentifier, DispatchedAndCurrentEvents> events = this.eventsMap;
final boolean isDebugEnabled = logger.isDebugEnabled();
do {
HashMap result = new HashMap();
try {
- for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : events.entrySet()) {
+ for (Map.Entry<ThreadIdentifier, DispatchedAndCurrentEvents> entry : ((Map<ThreadIdentifier, DispatchedAndCurrentEvents>) this.eventsMap)
+ .entrySet()) {
if (entry.getValue().isCountersEmpty()) {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
- } catch (ConcurrentModificationException e) { // TODO:WTF: bad practice but eventsMap is
- // ConcurrentHashMap
+ } catch (ConcurrentModificationException ignore) {
+ // TODO:WTF: bad practice but eventsMap is ConcurrentHashMap
if (isDebugEnabled) {
logger.debug(
"HARegion encountered concurrent modification exception while analysing event state - will try again");
@@ -826,9 +820,7 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Implementation in BlokcingHARegionQueue class
- *
- * @throws InterruptedException
+ * Implementation in BlockingHARegionQueue class
*/
void checkQueueSizeConstraint() throws InterruptedException {
if (Thread.interrupted())
@@ -846,13 +838,11 @@ public class HARegionQueue implements RegionQueue {
/**
* Creates the static dispatchedMessagesMap (if not present) and starts the QueuRemovalThread if
* not running
- *
*/
- public static synchronized void startHAServices(GemFireCacheImpl c) {
-
+ static synchronized void startHAServices(InternalCache cache) {
if (qrmThread == null) {
dispatchedMessagesMap = new ConcurrentHashMap();
- qrmThread = new QueueRemovalThread(c);
+ qrmThread = new QueueRemovalThread(cache);
qrmThread.setName("Queue Removal Thread");
qrmThread.start();
}
@@ -913,20 +903,16 @@ public class HARegionQueue implements RegionQueue {
}
}
Object key = event.getKeyToConflate();
- Long previousPosition = (Long) latestIndexesForRegion.put(key, newPosition);
- return previousPosition;
-
+ return (Long) latestIndexesForRegion.put(key, newPosition);
}
/**
- *
* Creates and returns a ConcurrentMap. This method is over-ridden in test classes to test some
* functionality
*
* @return new ConcurrentMap
*/
ConcurrentMap createConcurrentMap() {
-
return new ConcurrentHashMap();
}
@@ -948,7 +934,7 @@ public class HARegionQueue implements RegionQueue {
// if (!HARegionQueue.this.isPrimary()) {
HARegionQueue.this.expireTheEventOrThreadIdentifier(event);
// }
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// ignore, we're done
} catch (CacheException ce) {
if (!destroyInProgress) {
@@ -967,11 +953,8 @@ public class HARegionQueue implements RegionQueue {
* overridden function createCacheListenerForHARegion of the HARegionQueueJUnitTest class for the
* test testConcurrentEventExpiryAndTake. This function provides the meaningful functionality for
* expiry of the Event object as well as ThreadIdentifier
- * <p>
- * author Asif
- *
+ *
* @param event event object representing the data being expired
- * @throws CacheException
*/
void expireTheEventOrThreadIdentifier(EntryEvent event) throws CacheException {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -980,11 +963,6 @@ public class HARegionQueue implements RegionQueue {
"HARegionQueue::afterInvalidate. Entry Event being invalidated:{}, isPrimaryQueue:{}",
event, HARegionQueue.this.isPrimary());
}
- // if (HARegionQueue.this.isPrimary()) {
- // logger.info(LocalizedStrings.DEBUG,
- // "HARegionQueue: Entry Event being invalidated ="
- // + event+", after current queue became primary.");
- // }
Object key = event.getKey();
if (key instanceof ThreadIdentifier) {
// Check if the sequenceID present as value against this key is same
@@ -998,7 +976,7 @@ public class HARegionQueue implements RegionQueue {
(DispatchedAndCurrentEvents) HARegionQueue.this.eventsMap.get(key);
Assert.assertTrue(dace != null);
Long expirySequenceID = (Long) event.getOldValue();
- boolean expired = dace.expireOrUpdate(expirySequenceID.longValue(), (ThreadIdentifier) key);
+ boolean expired = dace.expireOrUpdate(expirySequenceID, (ThreadIdentifier) key);
if (isDebugEnabled) {
logger.debug(
"HARegionQueue::afterInvalidate:Size of the region after expiring or updating the ThreadIdentifier={}",
@@ -1028,21 +1006,18 @@ public class HARegionQueue implements RegionQueue {
/**
* This method adds the position of newly added object to the List of available IDs so that it is
- * avaialble for peek or take. This method is called from DispatchedAndCurrentEvents object. This
+ * available for peek or take. This method is called from DispatchedAndCurrentEvents object. This
* method is invoked in a write lock for non blocking queue & in a reentrant lock in a blocking
- * queue. In case of blokcing queue , this method also signals the waiting take & peek threads to
+ * queue. In case of blocking queue , this method also signals the waiting take & peek threads to
* awake.
- * <p>
- * author Asif
- *
+ *
* @param position The Long position of the object which has been added
- * @throws InterruptedException
*/
void publish(Long position) throws InterruptedException {
acquireWriteLock();
try {
this.idsAvailable.add(position);
- // Asif:Notify the wiating peek threads or take threads of blocking queue
+ // Notify the waiting peek threads or take threads of blocking queue
// A void operation for the non blocking queue operations
notifyPeekAndTakeThreads();
} finally {
@@ -1055,12 +1030,8 @@ public class HARegionQueue implements RegionQueue {
}
/**
- *
- * <p>
- * author Asif
- *
* @param position Long value present in the Available IDs map against which Event object is
- * present in HARegion. This function is directly ivnoked from the basicInvalidate function
+ * present in HARegion. This function is directly invoked from the basicInvalidate function
* where expiry is aborted if this function returns false
* @return boolean true if the position could be removed from the Set
* @throws InterruptedException *
@@ -1087,12 +1058,10 @@ public class HARegionQueue implements RegionQueue {
* in the AvailableID Set. If the position existed in the Set, then only it is removed from the
* Set & the underlying Region
*
- * @param position Long poistion counter for entry in the Region
+ * @param position Long position counter for entry in the Region
* @return true if the entry with <br>
* position <br>
* specified was removed from the Set
- * @throws InterruptedException
- *
*/
protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException {
boolean removedOK = this.destroyFromAvailableIDs(position);
@@ -1100,9 +1069,7 @@ public class HARegionQueue implements RegionQueue {
if (removedOK) {
try {
this.destroyFromQueue(position);
- } catch (EntryNotFoundException enfe) {
- // if (!this.region.isDestroyed()) {
- // if (!HARegionQueue.this.destroyInProgress || !this.region.isDestroyed()) {
+ } catch (EntryNotFoundException ignore) {
if (!HARegionQueue.this.destroyInProgress) {
if (!this.region.isDestroyed()) {
Assert.assertTrue(false, "HARegionQueue::remove: The position " + position
@@ -1128,7 +1095,7 @@ public class HARegionQueue implements RegionQueue {
maintainCqStats(event, -1);
}
- /** Returns the <code>toString</code> for this RegionQueue object */
+ /** Returns the {@code toString} for this RegionQueue object */
@Override
public String toString() {
return "RegionQueue on " + this.regionName + "(" + (this.isPrimary ? "primary" : "backup")
@@ -1144,12 +1111,9 @@ public class HARegionQueue implements RegionQueue {
/**
- * This method is inoked by the take function . For non blocking queue it returns null or a valid
+ * This method is invoked by the take function . For non blocking queue it returns null or a valid
* long position while for blocking queue it waits for data in the queue or throws Exception if
* the thread encounters exception while waiting.
- *
- * @throws CacheException
- * @throws InterruptedException
*/
protected Long getAndRemoveNextAvailableID() throws InterruptedException {
Long next = null;
@@ -1177,14 +1141,9 @@ public class HARegionQueue implements RegionQueue {
/**
* Returns the next position counter present in idsAvailable set. This method is invoked by the
* peek function. In case of BlockingQueue, this method waits till a valid ID is available.
- *
- * <p>
- * author Asif
- *
+ *
* @return valid Long poistion or null depending upon the nature of the queue
- * @throws InterruptedException
* @throws TimeoutException if operation is interrupted (unfortunately)
- *
*/
private Long getNextAvailableID() throws InterruptedException {
Long next = null;
@@ -1208,7 +1167,6 @@ public class HARegionQueue implements RegionQueue {
* For non blocking queue , this method either returns null or an Object. For blocking queue it
* will always return with an Object or wait for queue to be populated.
*
- * @throws InterruptedException
* @throws CacheException The exception can be thrown by BlockingQueue if it encounters
* InterruptedException while waiting for data
*/
@@ -1281,8 +1239,6 @@ public class HARegionQueue implements RegionQueue {
/**
* Removes the events that were peeked by this thread. The events are destroyed from the queue and
* conflation map and DispatchedAndCurrentEvents are updated accordingly.
- *
- * @throws InterruptedException
*/
public void remove() throws InterruptedException {
List peekedIds = (List) HARegionQueue.peekedEventsContext.get();
@@ -1324,10 +1280,10 @@ public class HARegionQueue implements RegionQueue {
List countersList;
if ((countersList = (List) groupedThreadIDs.get(threadid)) != null) {
countersList.add(info);
- countersList.set(0, Long.valueOf(sequenceId));
+ countersList.set(0, sequenceId);
} else {
countersList = new ArrayList();
- countersList.add(Long.valueOf(sequenceId));
+ countersList.add(sequenceId);
countersList.add(info);
groupedThreadIDs.put(threadid, countersList);
}
@@ -1344,7 +1300,7 @@ public class HARegionQueue implements RegionQueue {
Map.Entry element = (Map.Entry) iter.next();
ThreadIdentifier tid = (ThreadIdentifier) element.getKey();
List removedEvents = (List) element.getValue();
- long lastDispatchedId = ((Long) removedEvents.remove(0)).longValue();
+ long lastDispatchedId = (Long) removedEvents.remove(0);
DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(tid);
if (dace != null && dace.lastDispatchedSequenceId < lastDispatchedId) {
try {
@@ -1387,7 +1343,7 @@ public class HARegionQueue implements RegionQueue {
if (next == null) {
break;
}
- } catch (TimeoutException te) {
+ } catch (TimeoutException ignore) {
throw new InterruptedException();
}
object = (Conflatable) this.region.get(next);
@@ -1459,8 +1415,6 @@ public class HARegionQueue implements RegionQueue {
* @param timeToWait The number of milliseconds to attempt to peek
*
* @return The list of events peeked
- * @throws InterruptedException
- *
*/
public List peek(int batchSize, int timeToWait) throws InterruptedException {
long start = System.currentTimeMillis();
@@ -1506,7 +1460,7 @@ public class HARegionQueue implements RegionQueue {
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(50); // TODO this seems kinda busy IMNSHO -- jason
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
this.region.getCancelCriterion().checkCancelInProgress(null);
} finally {
@@ -1520,11 +1474,10 @@ public class HARegionQueue implements RegionQueue {
/**
* This method prepares the batch of events and updates the thread-context with corresponding
* counters, so that when remove is called by this thread, these events are destroyed from the
- * queue.This method should always be invoked within the <code>rwLock</code>.
+ * queue.This method should always be invoked within the {@code rwLock}.
*
* @param batchSize - number of events to be peeked
* @return - list of events peeked
- * @throws CacheException
*/
private List getBatchAndUpdateThreadContext(int batchSize) {
Iterator itr = this.idsAvailable.iterator();
@@ -1559,26 +1512,22 @@ public class HARegionQueue implements RegionQueue {
}
public void addCacheListener(CacheListener listener) {
- // TODO Auto-generated method stub
-
+ // nothing
}
public void removeCacheListener() {
- // TODO Auto-generated method stub
+ // nothing
}
/**
* It adds the entry to a static data structure dispatchedMessagesMap which is periodically
* operated upon by the QRM thread.
- *
- * <p>
- * author Asif
- *
+ *
* @param tid - the ThreadIdentifier object for this event
* @param sequenceId - the sequence id for this event
*/
public void addDispatchedMessage(ThreadIdentifier tid, long sequenceId) {
- Long lastSequenceNumber = Long.valueOf(sequenceId);
+ Long lastSequenceNumber = sequenceId;
boolean wasEmpty = false;
Long oldvalue = null;
Map internalMap = null;
@@ -1734,16 +1683,10 @@ public class HARegionQueue implements RegionQueue {
* creates a DACE. Only one QRM operates at a time on a DACE & any other mesasge will be waiting
* for the current thread to exit. This is accomplished by taking a lock on QRM_LOCK object in the
* DACE.
- *
- * <p>
- * author Asif
- *
+ *
* @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence
* Id
- * @throws CacheException
- * @throws InterruptedException
*/
-
void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException {
ThreadIdentifier ti = getThreadIdentifier(lastDispatched);
long sequenceID = lastDispatched.getSequenceID();
@@ -1764,11 +1707,9 @@ public class HARegionQueue implements RegionQueue {
} else {
// Add the recently added ThreadIdentifier to the RegionQueue for
// expiry
- this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId));
+ this.region.put(ti, dace.lastDispatchedSequenceId);
// update the stats
- // if (logger.isDebugEnabled()) {
this.stats.incThreadIdentifiers();
- // }
}
}
}
@@ -1778,7 +1719,6 @@ public class HARegionQueue implements RegionQueue {
*
* @return the size of the queue
*/
-
public int size() {
acquireReadLock();
try {
@@ -1788,12 +1728,8 @@ public class HARegionQueue implements RegionQueue {
}
}
- void decrementTakeSidePutPermits() {
-
- }
-
void incrementTakeSidePutPermits() {
-
+ // nothing
}
// called from dace on a put.
@@ -1929,13 +1865,8 @@ public class HARegionQueue implements RegionQueue {
/**
* Always returns false for a HARegionQueue class. Suitably overridden in BlockingHARegionQueue
* class.
- *
- * <p>
- * author Asif
- *
+ *
* @return false for HAREgionQueue as this is a non blocking class
- * @throws InterruptedException
- *
*/
boolean waitForData() throws InterruptedException {
return false;
@@ -1976,12 +1907,8 @@ public class HARegionQueue implements RegionQueue {
* @param cache Gemfire Cache instance
* @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws CacheException
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
final int haRgnQType, final boolean isDurable)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
Map container = null;
@@ -1993,7 +1920,7 @@ public class HARegionQueue implements RegionQueue {
container = new HashMap();
}
- return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache,
+ return getHARegionQueueInstance(regionName, cache,
HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haRgnQType, isDurable, container, null,
HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
}
@@ -2009,12 +1936,8 @@ public class HARegionQueue implements RegionQueue {
* @param isPrimary whether this is the primary queue for the client
* @param canHandleDelta boolean indicating whether the HARegionQueue can handle delta or not
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, GemFireCacheImpl cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable, Map haContainer,
ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary,
boolean canHandleDelta)
@@ -2038,12 +1961,11 @@ public class HARegionQueue implements RegionQueue {
default:
throw new IllegalArgumentException(
LocalizedStrings.HARegionQueue_HARGNQTYPE_CAN_EITHER_BE_BLOCKING_0_OR_NON_BLOCKING_1
- .toLocalizedString(new Object[] {Integer.valueOf(BLOCKING_HA_QUEUE),
- Integer.valueOf(NON_BLOCKING_HA_QUEUE)}));
+ .toLocalizedString(new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE}));
}
if (!isDurable) {
Integer expiryTime = Integer.getInteger(REGION_ENTRY_EXPIRY_TIME, hrqa.getExpiryTime());
- hrqa.setExpiryTime(expiryTime.intValue());
+ hrqa.setExpiryTime(expiryTime);
ExpirationAttributes ea =
new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE);
hrq.region.getAttributesMutator().setEntryTimeToLive(ea);
@@ -2054,19 +1976,10 @@ public class HARegionQueue implements RegionQueue {
/**
* Creates a HARegionQueue object with default attributes. used by tests
*
- * @param regionName
- * @param cache
- * @param hrqa
- * @param haRgnQType
- * @param isDurable
* @return an instance of HARegionQueue
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache,
+ public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
Map container = null;
@@ -2078,8 +1991,8 @@ public class HARegionQueue implements RegionQueue {
container = new HashMap();
}
- return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache, hrqa, haRgnQType,
- isDurable, container, null, HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
+ return getHARegionQueueInstance(regionName, cache, hrqa, haRgnQType, isDurable, container, null,
+ HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE);
}
public boolean isEmptyAckList() {
@@ -2122,7 +2035,7 @@ public class HARegionQueue implements RegionQueue {
if (this.destroyFromAvailableIDsAndRegion(counter)) {
stats.incEventsRemoved();
}
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -2134,18 +2047,14 @@ public class HARegionQueue implements RegionQueue {
}
}
-
-
/**
- * This is an implemention of RegionQueue where peek() & take () are blocking operation and will
+ * This is an implementation of RegionQueue where peek() & take () are blocking operation and will
* not return unless it gets some legitimate value The Lock object used by this class is a
* ReentrantLock & not a ReadWriteLock as in the base class. This reduces the concurrency of peek
* operations, but it enables the condition object of the ReentrantLock used to guard the
* idsAvailable Set for notifying blocking peek & take operations. Previously a separate Lock
* object was used by the BlockingQueue for wait notify. This class will be performant if there is
* a single peek thread.
- *
- *
*/
private static class BlockingHARegionQueue extends HARegionQueue {
/**
@@ -2180,19 +2089,11 @@ public class HARegionQueue implements RegionQueue {
protected final StoppableCondition blockCond;
/**
- *
- * @param regionName
- * @param cache
* @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can
* be set
- * @param haContainer
* @param isPrimary whether this is the primary queue for a client
- * @throws IOException TODO-javadocs
- * @throws ClassNotFoundException TODO-javadocs
- * @throws CacheException TODO-javadocs
- * @throws InterruptedException
*/
- protected BlockingHARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected BlockingHARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2231,12 +2132,9 @@ public class HARegionQueue implements RegionQueue {
* This effectively makes the blocking queue behave like a non-blocking queue which throttles
* puts if it reaches its capacity. This was changed in 8.1, see #51400. This function is NOOP
* in the HARegionQueue.
- *
- * <p>
- * author Asif
*/
@Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
void checkQueueSizeConstraint() throws InterruptedException {
if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
if (Thread.interrupted())
@@ -2261,8 +2159,8 @@ public class HARegionQueue implements RegionQueue {
this.maxQueueSizeHitCount = 0;
}
++this.maxQueueSizeHitCount;
- // for (;;) {
this.region.checkReadiness(); // fix for bug 37581
+ // TODO: wait called while holding two locks
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
this.region.checkReadiness(); // fix for bug 37581
// Fix for #51400. Allow the queue to grow beyond its
@@ -2270,15 +2168,13 @@ public class HARegionQueue implements RegionQueue {
// drain the queue, either due to a slower client or the
// deadlock scenario mentioned in the ticket.
reconcilePutPermits();
- // }
if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
logger.info(LocalizedMessage
.create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS));
}
} catch (InterruptedException ex) {
- // TODO:Asif: The line below is meaningless. Comment it out
- // later
- this.permitMon.notify();
+ // TODO: The line below is meaningless. Comment it out later
+ this.permitMon.notifyAll();
throw ex;
}
}
@@ -2292,14 +2188,10 @@ public class HARegionQueue implements RegionQueue {
/**
* This function should always be called under a lock on putGuard & permitMon obejct
- *
- * <p>
- * author Asif
- *
- * @return int currnet Put permits
+ *
+ * @return int current Put permits
*/
private int reconcilePutPermits() {
-
putPermits += takeSidePutPermits;
takeSidePutPermits = 0;
return putPermits;
@@ -2312,10 +2204,6 @@ public class HARegionQueue implements RegionQueue {
* added in case a put operation which has reduced the put permit optmistically but due to some
* reason ( most likely because of duplicate event) was not added in the queue. In such case it
* will increment take side permit without notifying any waiting thread
- *
- * <p>
- * author Asif
- *
*/
@Override
void incrementTakeSidePutPermitsWithoutNotify() {
@@ -2328,24 +2216,19 @@ public class HARegionQueue implements RegionQueue {
* Implemented to reduce contention between concurrent take/remove operations and put . The
* reconciliation between take side put permits & put side put permits happens only if theput
* side put permits are exhausted. In HARehionQueue base class this is a NOOP function
- *
- * <p>
- * author Asif
- *
*/
@Override
void incrementTakeSidePutPermits() {
if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413
synchronized (this.permitMon) {
++this.takeSidePutPermits;
- this.permitMon.notify();
+ this.permitMon.notifyAll();
}
}
}
/**
* Identical to the acquireReadLock as there is only one type of Lock object in this class.
- *
*/
@Override
void acquireWriteLock() {
@@ -2378,8 +2261,6 @@ public class HARegionQueue implements RegionQueue {
* acquiring the lock on ReentrantLock object. It blocks the thread if the queue is empty or
* returns true otherwise . This will always return true indicating that data is available for
* retrieval or throw an Exception.It can never return false.
- *
- * @throws InterruptedException
*/
@Override
boolean waitForData() throws InterruptedException {
@@ -2443,7 +2324,7 @@ public class HARegionQueue implements RegionQueue {
LinkedList unremovedElements = null;
HashMap currDurableMap = null;
- protected DurableHARegionQueue(String regionName, GemFireCacheImpl cache,
+ protected DurableHARegionQueue(String regionName, InternalCache cache,
HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId,
final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
@@ -2604,11 +2485,11 @@ public class HARegionQueue implements RegionQueue {
this.releaseWriteLock();
}
}
- /**
- * ashetkar: Setting this threadlocal variable to null has no use as the current thread never
- * uses it. Instead it should really be set null by message dispatcher thread while starting
- * or resuming. This was added in revision 20914. Need to check if it really needs to be
- * thread local.
+ /*
+ * Setting this threadlocal variable to null has no use as the current thread never uses it.
+ * Instead it should really be set null by message dispatcher thread while starting or
+ * resuming. This was added in revision 20914. Need to check if it really needs to be thread
+ * local.
*/
peekedEventsContext.set(null);
this.threadIdToSeqId.list.clear();
@@ -2675,38 +2556,27 @@ public class HARegionQueue implements RegionQueue {
* bridge between the user defined HARegionQueue class & the actual class. This class object will
* be buggy as it will tend to publish the Object o QRM thread & the expiry thread before the
* complete creation of the HARegionQueue instance
- *
- * <p>
- * author Asif
- *
*/
static class TestOnlyHARegionQueue extends HARegionQueue {
/**
* Overloaded constructor to accept haContainer.
*
- * @param regionName
- * @param cache
- * @param haContainer
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- TestOnlyHARegionQueue(String regionName, Cache cache, Map haContainer)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, Map haContainer)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
- haContainer, HandShake.CONFLATION_DEFAULT, false);
+ this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haContainer,
+ HandShake.CONFLATION_DEFAULT, false);
this.initialized.set(true);
}
- TestOnlyHARegionQueue(String regionName, Cache cache)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES,
- new HashMap(), HandShake.CONFLATION_DEFAULT, false);
+ this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, new HashMap(),
+ HandShake.CONFLATION_DEFAULT, false);
}
- TestOnlyHARegionQueue(String regionName, GemFireCacheImpl cache, HARegionQueueAttributes hrqa,
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa,
Map haContainer, final byte clientConflation, boolean isPrimary)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
super(regionName, cache, hrqa, haContainer, null, clientConflation, isPrimary);
@@ -2718,34 +2588,21 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Overloaded constructor to pass an <code>HashMap</code> instance as a haContainer.
+ * Overloaded constructor to pass an {@code HashMap} instance as a haContainer.
*
- * @param regionName
- * @param cache
- * @param hrqa
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws CacheException
- * @throws InterruptedException
* @since GemFire 5.7
*/
- TestOnlyHARegionQueue(String regionName, Cache cache, HARegionQueueAttributes hrqa)
+ TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- this(regionName, (GemFireCacheImpl) cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT,
- false);
+ this(regionName, cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT, false);
}
}
/**
* This thread will check for messages which have been dispatched. After a configurable time or
- * size is reached, it will create a new <code>QueueRemovalMessage</code> and send it to all the
- * nodes in the DistributedSystem
- *
- * <p>
- * author Mitul Bid
- *
+ * size is reached, it will create a new {@code QueueRemovalMessage} and send it to all the nodes
+ * in the DistributedSystem
*/
-
private static class QueueRemovalThread extends Thread {
/**
@@ -2753,14 +2610,14 @@ public class HARegionQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
/**
* Constructor : Creates and initializes the thread
*/
- public QueueRemovalThread(GemFireCacheImpl c) {
+ public QueueRemovalThread(InternalCache cache) {
this.setDaemon(true);
- this.cache = c;
+ this.cache = cache;
}
private boolean checkCancelled() {
@@ -2775,11 +2632,7 @@ public class HARegionQueue implements RegionQueue {
/**
* The thread will check the dispatchedMessages map for messages that have been dispatched. It
- * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
- */
- /**
- * The thread will check the dispatchedMessages map for messages that have been dispatched. It
- * will create a new <code>QueueRemovalMessage</code> and send it to the other nodes
+ * will create a new {@code QueueRemovalMessage} and send it to the other nodes
*/
@Override
public void run() {
@@ -2836,7 +2689,7 @@ public class HARegionQueue implements RegionQueue {
dm.putOutgoing(qrm);
} // messages exist
} // be somewhat tolerant of failures
- catch (CancelException e) {
+ catch (CancelException ignore) {
if (logger.isDebugEnabled()) {
logger.debug("QueueRemovalThread is exiting due to cancellation");
}
@@ -2918,14 +2771,14 @@ public class HARegionQueue implements RegionQueue {
* threadIdToSequenceIdMap.list.add(internalMap); } }
*/
// first add the size within the lock
- queueRemovalMessageList.add(Integer.valueOf(internalMap.size()));
+ queueRemovalMessageList.add(internalMap.size());
internalIterator = internalMap.entrySet().iterator();
// then add the event ids to the message list within the lock
while (internalIterator.hasNext()) {
internalEntry = (Map.Entry) internalIterator.next();
tid = (ThreadIdentifier) internalEntry.getKey();
sequenceId = (Long) internalEntry.getValue();
- eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId.longValue());
+ eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId);
queueRemovalMessageList.add(eventId);
}
}
@@ -2945,7 +2798,7 @@ public class HARegionQueue implements RegionQueue {
boolean interrupted = Thread.interrupted();
try {
this.join(15 * 1000);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -2960,13 +2813,9 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Class whick keeps track of the positions ( keys) of underlying Region object for the events
+ * Class which keeps track of the positions ( keys) of underlying Region object for the events
* placed in the Queue. It also keeps track of the last sequence ID dispatched. Thus all the
* events with sequence ID less than that dispatched are eligible for removal
- *
- * <p>
- * author Asif
- *
*/
public static class DispatchedAndCurrentEvents implements DataSerializableFixedID, Serializable {
/**
@@ -3006,10 +2855,6 @@ public class HARegionQueue implements RegionQueue {
/**
* Used for debugging purpose to ensure that in no situation , for a given ThreadIdentifier the
* order gets violated
- *
- * <p>
- * author Asif
- *
*/
protected volatile long lastSequenceIDPut = INIT_OF_SEQUENCEID;
@@ -3024,15 +2869,9 @@ public class HARegionQueue implements RegionQueue {
* @param event Object to be added to the queue
* @param sequenceID Sequence ID of the event originating from a unqiue thread identified by its
* ThreadIdentifier
- * @throws CacheException
- * @throws InterruptedException
*/
protected boolean putObject(Conflatable event, long sequenceID)
throws CacheException, InterruptedException {
- // logger.debug("BRUCE: putObject() lastSequenceIDPut="+lastSequenceIDPut
- // +"; adding sequenceID="+sequenceID + " for " + event);
- // logger.info("putObject, sequenceID = " + sequenceID + "; lastSequenceIDPut = " +
- // lastSequenceIDPut, new Exception("putObject"));
Long oldPosition = null;
final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER);
if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID
@@ -3072,7 +2911,7 @@ public class HARegionQueue implements RegionQueue {
}
if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) {
// Insert the object into the Region
- Long position = Long.valueOf(owningQueue.tailKey.incrementAndGet());
+ Long position = owningQueue.tailKey.incrementAndGet();
owningQueue.putEventInHARegion(event, position);
@@ -3128,15 +2967,11 @@ public class HARegionQueue implements RegionQueue {
/**
* Destroys the the old entry ( which got replaced by the new entry due to conflation) from the
- * availableIDs , Region & Counters set. Since this is executed within a synch block by the new
+ * availableIDs , Region & Counters set. Since this is executed within a sync block by the new
* entry thread, it is guaranteed that the old entry thread will exit first , placing the
- * poistion etc in the available IDs set. Also the new entry thraed & old entry thread are
- * belonging to diffrenet ThreadIdentifier objects & hence hold different
+ * position etc in the available IDs set. Also the new entry thread & old entry thread are
+ * belonging to different ThreadIdentifier objects & hence hold different
* DispatchedAndCurrentEvents object.
- *
- * @param oldPosition
- * @throws CacheException
- * @throws InterruptedException
*/
private void removeOldConflatedEntry(Long oldPosition)
throws CacheException, InterruptedException {
@@ -3152,9 +2987,8 @@ public class HARegionQueue implements RegionQueue {
}
// </HA overflow>
// update statistics
- // if (logger.isDebugEnabled()) {
- // vrao: Fix for bug 39291:
+ // Fix for bug 39291:
// Since markers are always conflated regardless of the conflation
// setting and they are not normal (are internal) events, we should
// not bump the events-conflated stat for markers.
@@ -3163,7 +2997,6 @@ public class HARegionQueue implements RegionQueue {
} else {
owningQueue.stats.incMarkerEventsConflated();
}
- // }
}
}
}
@@ -3184,13 +3017,10 @@ public class HARegionQueue implements RegionQueue {
ConcurrentMap conflationMap = (ConcurrentMap) owningQueue.indexes.get(rName);
Assert.assertTrue(conflationMap != null);
conflationMap.remove(key, position);
-
}
/**
* Removes the Entry from the Counters Set contained in DACE
- *
- * @param position
*/
protected synchronized void destroy(Long position) {
if (this.counters != null) {
@@ -3227,17 +3057,17 @@ public class HARegionQueue implements RegionQueue {
owningQueue.eventsMap.remove(ti);
expired = true;
this.owningQueue.getStatistics().decThreadIdentifiers();
- } catch (RegionDestroyedException ignore) {
+ } catch (RegionDestroyedException e) {
if (!owningQueue.destroyInProgress && logger.isDebugEnabled()) {
logger.debug(
"DispatchedAndCurrentEvents::expireOrUpdate: Queue found destroyed while removing expiry entry for ThreadIdentifier={} and expiry value={}",
- ti, expVal, ignore);
+ ti, expVal, e);
}
} catch (EntryNotFoundException enfe) {
if (!owningQueue.destroyInProgress) {
logger.error(LocalizedMessage.create(
LocalizedStrings.HARegionQueue_DISPATCHEDANDCURRENTEVENTSEXPIREORUPDATE_UNEXPECTEDLY_ENCOUNTERED_EXCEPTION_WHILE_REMOVING_EXPIRY_ENTRY_FOR_THREADIDENTIFIER_0_AND_EXPIRY_VALUE_1,
- new Object[] {ti, Long.valueOf(expVal), enfe}));
+ new Object[] {ti, expVal, enfe}));
}
}
}
@@ -3245,7 +3075,7 @@ public class HARegionQueue implements RegionQueue {
if (!expired) {
try {
// Update the entry with latest sequence ID
- owningQueue.region.put(ti, Long.valueOf(this.lastDispatchedSequenceId));
+ owningQueue.region.put(ti, this.lastDispatchedSequenceId);
} catch (CancelException e) {
throw e;
} catch (Exception e) {
@@ -3267,8 +3097,6 @@ public class HARegionQueue implements RegionQueue {
* be removed
*
* @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
- * @throws CacheException
- * @throws InterruptedException
*/
protected void setLastDispatchedIDAndRemoveEvents(long lastDispatchedSeqId)
throws CacheException, InterruptedException {
@@ -3341,8 +3169,6 @@ public class HARegionQueue implements RegionQueue {
* Events which have been peeked & are now candidate for removal. It has to be guaranteed
* that the sequence IDs of all the other counters is less than the last dispatched
* @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE
- * @throws CacheException
- * @throws InterruptedException
*/
protected void setLastDispatchedIDAndRemoveEvents(List removedEventInfoList,
long lastDispatchedSeqId) throws CacheException, InterruptedException {
@@ -3472,8 +3298,6 @@ public class HARegionQueue implements RegionQueue {
/**
* destroys the underlying HARegion and removes its reference from the dispatched messages map
- *
- * @throws CacheWriterException
*/
public void destroy() throws CacheWriterException {
this.destroyInProgress = true;
@@ -3484,9 +3308,9 @@ public class HARegionQueue implements RegionQueue {
try {
try {
updateHAContainer();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// keep going
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// keep going
if (logger.isDebugEnabled()) {
logger.debug("HARegionQueue#destroy: ignored cancellation!!!!");
@@ -3495,9 +3319,9 @@ public class HARegionQueue implements RegionQueue {
try {
this.region.destroyRegion();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// keep going
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// keep going
}
((HAContainerWrapper) haContainer).removeProxy(regionName);
@@ -3510,8 +3334,6 @@ public class HARegionQueue implements RegionQueue {
* If the event is an instance of HAEventWrapper, put it into the haContainer and then into the ha
* region. Otherwise, simply put it into the ha region.
*
- * @param event
- * @param position
* @since GemFire 5.7
*/
protected void putEventInHARegion(Conflatable event, Long position) {
@@ -3623,7 +3445,7 @@ public class HARegionQueue implements RegionQueue {
* If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its
* clientUpdateMessage into the haContainer as <key, value>.
*
- * @param haEventWrapper An instance of <code>HAEventWrapper</code>
+ * @param haEventWrapper An instance of {@code HAEventWrapper}
* @since GemFire 5.7
*/
protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) {
@@ -3690,7 +3512,7 @@ public class HARegionQueue implements RegionQueue {
Object[] wrapperArray = null;
acquireReadLock();
try {
- if (!(this.availableIDsSize() == 0)) {
+ if (this.availableIDsSize() != 0) {
wrapperArray = this.availableIDsArray();
}
} finally {
@@ -3714,7 +3536,7 @@ public class HARegionQueue implements RegionQueue {
HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
}
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
return; // we're done
} catch (Exception e) {
if (logger.isDebugEnabled()) {
@@ -3751,10 +3573,9 @@ public class HARegionQueue implements RegionQueue {
* If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
* in the haContainer, then decrements its reference count. If the decremented ref count is zero
* and put is not in progress, removes the entry from the haContainer, before returning the
- * <code>ClientUpdateMessage</code> instance.
+ * {@code ClientUpdateMessage} instance.
*
- * @param conflatable
- * @return An instance of <code>ClientUpdateMessage</code>
+ * @return An instance of {@code ClientUpdateMessage}
* @since GemFire 5.7
*/
public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) {
@@ -3778,7 +3599,6 @@ public class HARegionQueue implements RegionQueue {
* Decrements wrapper's reference count by one. If the decremented ref count is zero and put is
* not in progress, removes the entry from the haContainer.
*
- * @param wrapper
* @since GemFire 5.7
*/
public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) {
@@ -3813,7 +3633,7 @@ public class HARegionQueue implements RegionQueue {
/**
* Set whether the dispatcher of this node is active or not (i.e. primary or secondary node). If
- * <code>flag</code> is set to <code>true</code>, disables Entry Expiry Tasks.
+ * {@code flag} is set to {@code true}, disables Entry Expiry Tasks.
*
* @param flag the value to set isPrimary to
*/
@@ -3830,7 +3650,7 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * Disables EntryExpiryTask for the HARegion (<code>this.region</code>).
+ * Disables EntryExpiryTask for the HARegion ({@code this.region}).
*
*/
private void disableEntryExpiryTasks() {
@@ -3842,7 +3662,7 @@ public class HARegionQueue implements RegionQueue {
this.region.setCustomEntryTimeToLive(new ThreadIdentifierCustomExpiry());
logger.info(LocalizedMessage.create(
LocalizedStrings.HARegionQueue_ENYTRY_EXPIRY_TASKS_DISABLED_BECAUSE_QUEUE_BECAME_PRIMARY_OLD_MSG_TTL_0,
- new Object[] {Integer.valueOf(oldTimeToLive)}));
+ new Object[] {oldTimeToLive}));
}
}
@@ -3882,7 +3702,7 @@ public class HARegionQueue implements RegionQueue {
if (r != null && !r.isDestroyed()) {
try {
r.close();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
}
}
}
@@ -3895,97 +3715,83 @@ public class HARegionQueue implements RegionQueue {
*/
public boolean isPeekInitialized() {
return HARegionQueue.peekedEventsContext.get() != null;
-
}
-}
-
-
-/**
- * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is operating
- * on it. This wrapper acts as a means of communication between the QRM thread & the MapWrapper
- * object contained in the HARegionQueue
- *
- * <p>
- * author ashahid
- */
-
-class MapWrapper {
- Map map;
+ /**
+ * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is
+ * operating on it. This wrapper acts as a means of communication between the QRM thread & the
+ * MapWrapper object contained in the HARegionQueue
+ */
+ static class MapWrapper {
+ Map map;
- List list;
+ List list;
- boolean keepPrevAcks = false;
+ boolean keepPrevAcks = false;
- public MapWrapper() {
- super();
- map = new HashMap();
- list = new LinkedList();
- }
+ public MapWrapper() {
+ super();
+ map = new HashMap();
+ list = new LinkedList();
+ }
- void put(Object key, Object o) {
- synchronized (this.map) {
- this.map.put(key, o);
+ void put(Object key, Object o) {
+ synchronized (this.map) {
+ this.map.put(key, o);
+ }
}
}
-}
-
-/**
- * A wrapper class that has counter, key and the region-name for an event which was peeked and needs
- * to be removed. The key and regionName fields will be set only if conflation is true for the
- * event.
- *
- * <p>
- * author dpatel
- *
- */
-
-class RemovedEventInfo {
- Long counter;
+ /**
+ * A wrapper class that has counter, key and the region-name for an event which was peeked and
+ * needs to be removed. The key and regionName fields will be set only if conflation is true for
+ * the event.
+ */
+ static class RemovedEventInfo {
+ Long counter;
- String regionName;
+ String regionName;
- Object key;
+ Object key;
- public RemovedEventInfo(Long counter, String regionName, Object key) {
- this.counter = counter;
- this.regionName = regionName;
- this.key = key;
+ public RemovedEventInfo(Long counter, String regionName, Object key) {
+ this.counter = counter;
+ this.regionName = regionName;
+ this.key = key;
+ }
}
-}
+ /** this is used to expire thread identifiers, even in primary queues */
+ static class ThreadIdentifierCustomExpiry implements CustomExpiry {
+ private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
+ HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
+ private static volatile ExpirationAttributes testExpAtts = null;
-/** this is used to expire thread identifiers, even in primary queues */
-class ThreadIdentifierCustomExpiry implements CustomExpiry {
- private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
- HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
- private static volatile ExpirationAttributes testExpAtts = null;
-
- public ExpirationAttributes getExpiry(Region.Entry entry) {
- // Use key to determine expiration.
- Object key = entry.getKey();
- if (key instanceof ThreadIdentifier) {
- final int expTime = HARegionQueue.threadIdExpiryTime;
- if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
- // This should only happen in unit test code
- ExpirationAttributes result = testExpAtts;
- if (result == null || result.getTimeout() != expTime) {
- result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
- // save the expiration attributes in a static to prevent tests from creating lots of
- // instances.
- testExpAtts = result;
+ public ExpirationAttributes getExpiry(Region.Entry entry) {
+ // Use key to determine expiration.
+ Object key = entry.getKey();
+ if (key instanceof ThreadIdentifier) {
+ final int expTime = HARegionQueue.threadIdExpiryTime;
+ if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
+ // This should only happen in unit test code
+ ExpirationAttributes result = testExpAtts;
+ if (result == null || result.getTimeout() != expTime) {
+ result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE);
+ // save the expiration attributes in a static to prevent tests from creating lots of
+ // instances.
+ testExpAtts = result;
+ }
+ return result;
+ } else {
+ return DEFAULT_THREAD_ID_EXP_ATTS;
}
- return result;
} else {
- return DEFAULT_THREAD_ID_EXP_ATTS;
+ return null;
}
- } else {
- return null;
}
- }
- public void close() {}
+ public void close() {}
+ }
}