You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/11 21:06:53 UTC
[14/52] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..f09bb47 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
@@ -67,7 +66,7 @@ import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
@@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -119,8 +117,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
protected volatile boolean resetLastPeeked = false;
-
-
/**
* There will be one shadow pr for each of the the PartitionedRegion which has added the
* GatewaySender Fix for Bug#45917 We maintain a tempQueue to queue events when buckets are not
@@ -134,8 +130,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* the secondary nodes to remove the events which have already been dispatched from the queue.
*/
public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 10;
+
// TODO:REF: how to change the message sync interval ? should it be common for serial and parallel
protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL;
+
// TODO:REF: name change for thread, as it appears in the log
private BatchRemovalThread removalThread = null;
@@ -223,16 +221,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
final protected int index;
+
final protected int nDispatcher;
private MetaRegionFactory metaRegionFactory;
- /**
- * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote site.
- * It is cleared when the queue is cleared.
- */
- // private final BlockingQueue<Long> eventSeqNumQueue;
-
public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region> userRegions, int idx,
int nDispatcher) {
this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory());
@@ -249,7 +242,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
this.sender = sender;
List<Region> listOfRegions = new ArrayList<Region>(userRegions);
- // eventSeqNumQueue = new LinkedBlockingQueue<Long>();
Collections.sort(listOfRegions, new Comparator<Region>() {
@Override
public int compare(Region o1, Region o2) {
@@ -273,7 +265,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
throw new GatewaySenderConfigurationException(
LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1
.toLocalizedString(new Object[] {this.sender.getId(), userRegion.getFullPath()}));
- // addShadowPartitionedRegionForUserRR((DistributedRegion)userRegion);
}
}
@@ -295,7 +286,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// still, it is safer approach to synchronize it
synchronized (ParallelGatewaySenderQueue.class) {
if (removalThread == null) {
- removalThread = new BatchRemovalThread((GemFireCacheImpl) this.sender.getCache(), this);
+ removalThread = new BatchRemovalThread(this.sender.getCache(), this);
removalThread.start();
}
}
@@ -317,7 +308,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (this.userRegionNameToshadowPRMap.containsKey(regionName))
return;
- GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache();
+ InternalCache cache = sender.getCache();
final String prQName = getQueueName(sender.getId(), userRegion.getFullPath());
prQ = (PartitionedRegion) cache.getRegion(prQName);
if (prQ == null) {
@@ -375,8 +366,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
prQ.getPartitionAttributes());
}
- // Suranjan: TODO This should not be set on the PR but on the
- // GatewaySender
+ // TODO This should not be set on the PR but on the GatewaySender
prQ.enableConflation(sender.isBatchConflationEnabled());
// Before going ahead, make sure all the buckets of shadowPR are
@@ -391,32 +381,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
// In case of Replicated Region it may not be necessary.
- // if (sender.isPersistenceEnabled()) {
- // //Kishor: I need to write a test for this code.
- // Set<Integer> allBucketsClone = new HashSet<Integer>();
- // // allBucketsClone.addAll(allBuckets);*/
- // for (int i = 0; i < sender.getMaxParallelismForReplicatedRegion(); i++)
- // allBucketsClone.add(i);
- //
- // while (!(allBucketsClone.size() == 0)) {
- // Iterator<Integer> itr = allBucketsClone.iterator();
- // while (itr.hasNext()) {
- // InternalDistributedMember node = prQ.getNodeForBucketWrite(
- // itr.next(), null);
- // if (node != null) {
- // itr.remove();
- // }
- // }
- // // after the iteration is over, sleep for sometime before trying
- // // again
- // try {
- // Thread.sleep(WAIT_CYCLE_SHADOW_BUCKET_LOAD);
- // }
- // catch (InterruptedException e) {
- // logger.error(e);
- // }
- // }
- // }
} catch (IOException veryUnLikely) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0,
@@ -453,7 +417,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
private static String convertPathToName(String fullPath) {
- // return fullPath.replaceAll("/", "_");
return "";
}
@@ -490,7 +453,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
.toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()}));
}
- GemFireCacheImpl cache = (GemFireCacheImpl) sender.getCache();
+ InternalCache cache = sender.getCache();
boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath());
@@ -549,7 +512,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
.setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
// at this point we should be able to assert prQ == meta;
- // Suranjan: TODO This should not be set on the PR but on the GatewaySender
+ // TODO This should not be set on the PR but on the GatewaySender
prQ.enableConflation(sender.isBatchConflationEnabled());
if (isAccessor)
return; // return from here if accessor node
@@ -576,7 +539,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} finally {
if (prQ != null) {
-
this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
}
/*
@@ -611,7 +573,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
protected void afterRegionAdd(PartitionedRegion userPR) {
-
+ // nothing
}
/**
@@ -666,18 +628,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public boolean put(Object object) throws InterruptedException, CacheException {
final boolean isDebugEnabled = logger.isDebugEnabled();
boolean putDone = false;
- // Suranjan : Can this region ever be null? Should we work with regionName and not with region
+ // Can this region ever be null? Should we work with regionName and not with region
// instance.
// It can't be as put is happeing on the region and its still under process
GatewaySenderEventImpl value = (GatewaySenderEventImpl) object;
boolean isDREvent = isDREvent(value);
- // if (isDREvent(value)) {
- // putInShadowPRForReplicatedRegion(object);
- // value.freeOffHeapValue();
- // return;
- // }
-
Region region = value.getRegion();
String regionPath = null;
if (isDREvent) {
@@ -795,11 +751,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
brq.getInitializationLock().readLock().unlock();
}
} else {
- // tempQueue = this.bucketToTempQueueMap.get(bucketId);
- // if (tempQueue == null) {
- // tempQueue = new LinkedBlockingQueue();
- // this.bucketToTempQueueMap.put(bucketId, tempQueue);
- // }
tempQueue.add(value);
putDone = true;
// For debugging purpose.
@@ -811,7 +762,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
}
- // }
}
} finally {
@@ -873,12 +823,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
try {
if (brq != null) {
addedValueToQueue = brq.addToQueue(key, value);
- // TODO : Kishor : During merge, ParallelWANstats test failed. On
+ // TODO: During merge, ParallelWANstats test failed. On
// comment below code test passed. cheetha does not have below code.
// need to find out from hcih revision this code came
- // if (brq.getBucketAdvisor().isPrimary()) {
- // this.stats.incQueueSize();
- // }
}
} catch (BucketNotFoundException e) {
if (logger.isDebugEnabled()) {
@@ -933,18 +880,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return new HashSet(this.userRegionNameToshadowPRMap.values());
}
- // TODO: Suranjan Find optimal way to get Random shadow pr as this will be called in each put and
- // peek.
+ // TODO: Find optimal way to get Random shadow pr as this will be called in each put and peek.
protected PartitionedRegion getRandomShadowPR() {
PartitionedRegion prQ = null;
if (this.userRegionNameToshadowPRMap.values().size() > 0) {
int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
prQ = (PartitionedRegion) this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
}
- // if (this.userPRToshadowPRMap.values().size() > 0
- // && (prQ == null)) {
- // prQ = getRandomShadowPR();
- // }
return prQ;
}
@@ -1029,13 +971,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// TODO:REF: instead of shuffle use random number, in this method we are
// returning id instead we should return BRQ itself
- /*
- * Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets) {
- * BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
- * .getBucketRegionQueueByBucketId(bucketId);
- *
- * if (br != null && br.isReadyForPeek()) { return br.getId(); } }
- */
}
return -1;
}
@@ -1052,9 +987,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
GatewaySenderEventImpl event = this.peekedEvents.remove();
try {
- // PartitionedRegion prQ = this.userPRToshadowPRMap.get(ColocationHelper
- // .getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath());
- //
PartitionedRegion prQ = null;
int bucketId = -1;
Object key = null;
@@ -1071,11 +1003,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
} else {
String regionPath = event.getRegionPath();
- GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache();
+ InternalCache cache = this.sender.getCache();
Region region = (PartitionedRegion) cache.getRegion(regionPath);
if (region != null && !region.isDestroyed()) {
- // TODO: Suranjan We have to get colocated parent region for this
- // region
+ // TODO: We have to get colocated parent region for this region
if (region instanceof DistributedRegion) {
prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
event.getBucketId();
@@ -1105,7 +1036,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) {
boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
- // TODO : Kishor : Make sure we dont need to initalize a bucket
+ // TODO : Make sure we dont need to initalize a bucket
// before destroying a key from it
try {
if (brq != null) {
@@ -1261,7 +1192,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- blockProcesorThreadIfRequired();
+ blockProcessorThreadIfRequired();
return batch;
}
@@ -1340,7 +1271,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
this, batch.size(), size(), localSize());
}
if (batch.size() == 0) {
- blockProcesorThreadIfRequired();
+ blockProcessorThreadIfRequired();
}
return batch;
}
@@ -1400,10 +1331,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
- protected void blockProcesorThreadIfRequired() throws InterruptedException {
+ protected void blockProcessorThreadIfRequired() throws InterruptedException {
queueEmptyLock.lock();
try {
- // while (isQueueEmpty) {
if (isQueueEmpty) { // merge44610: this if condition came from cheetah 44610
if (logger.isDebugEnabled()) {
logger.debug("Going to wait, till notified.");
@@ -1414,7 +1344,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// parameter but cedar does not have such corresponding method
queueEmptyCondition.await(1000);
// merge44610: this time waiting came from cheetah 44610
- // isQueueEmpty = this.localSize() == 0;
}
// update the flag so that next time when we come we will block.
isQueueEmpty = this.localSizeForProcessor() == 0;
@@ -1526,7 +1455,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
throw new UnsupportedOperationException();
}
-
@Override
public void remove(int batchSize) throws CacheException {
for (int i = 0; i < batchSize; i++) {
@@ -1596,14 +1524,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
@Override
public void close() {
// Because of bug 49060 do not close the regions of a parallel queue
- // for (Region r: getRegions()) {
- // if (r != null && !r.isDestroyed()) {
- // try {
- // r.close();
- // } catch (RegionDestroyedException e) {
- // }
- // }
- // }
}
/**
@@ -1634,14 +1554,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
private final ParallelGatewaySenderQueue parallelQueue;
/**
* Constructor : Creates and initializes the thread
*/
- public BatchRemovalThread(GemFireCacheImpl c, ParallelGatewaySenderQueue queue) {
+ public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
super("BatchRemovalThread");
// TODO:REF: Name for this thread ?
this.setDaemon(true);
@@ -1772,7 +1692,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
}
- private Set<InternalDistributedMember> getAllRecipients(GemFireCacheImpl cache, Map map) {
+ private Set<InternalDistributedMember> getAllRecipients(InternalCache cache, Map map) {
Set recipients = new ObjectOpenHashSet();
for (Object pr : map.keySet()) {
PartitionedRegion partitionedRegion = (PartitionedRegion) cache.getRegion((String) pr);
@@ -1811,7 +1731,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
AbstractGatewaySender sender = null;
public ParallelGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs,
- LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender pgSender) {
+ LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender pgSender) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null)
@@ -1872,8 +1792,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
static class MetaRegionFactory {
- ParallelGatewaySenderQueueMetaRegion newMetataRegion(GemFireCacheImpl cache,
- final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) {
+ ParallelGatewaySenderQueueMetaRegion newMetataRegion(InternalCache cache, final String prQName,
+ final RegionAttributes ra, AbstractGatewaySender sender) {
ParallelGatewaySenderQueueMetaRegion meta =
new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
return meta;
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index a7f224f..d79e2c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -38,6 +38,7 @@ import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
@@ -52,7 +53,6 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
*
* @since GemFire 8.0
*/
-
public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
private static final Logger logger = LogService.getLogger();
@@ -73,7 +73,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
@Override
protected void process(DistributionManager dm) {
final boolean isDebugEnabled = logger.isDebugEnabled();
- final GemFireCacheImpl cache;
+ final InternalCache cache;
cache = GemFireCacheImpl.getInstance();
if (cache != null) {
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 02baf81..7928662 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -54,7 +54,7 @@ import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueue;
@@ -72,7 +72,6 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
/**
* @since GemFire 7.0
- *
*/
public class SerialGatewaySenderQueue implements RegionQueue {
@@ -208,14 +207,12 @@ public class SerialGatewaySenderQueue implements RegionQueue {
initializeRegion(abstractSender, listener);
// Increment queue size. Fix for bug 51988.
this.stats.incQueueSize(this.region.size());
- this.removalThread = new BatchRemovalThread((GemFireCacheImpl) abstractSender.getCache());
+ this.removalThread = new BatchRemovalThread(abstractSender.getCache());
this.removalThread.start();
this.sender = abstractSender;
if (logger.isDebugEnabled()) {
logger.debug("{}: Contains {} elements", this, size());
}
-
-
}
public Region<Long, AsyncEvent> getRegion() {
@@ -233,18 +230,8 @@ public class SerialGatewaySenderQueue implements RegionQueue {
(r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
final boolean isWbcl = this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
if (!(isPDXRegion && isWbcl)) {
- // TODO: Kishor : after merging this change. AsyncEventQueue test failed
- // with data inconsistency. As of now going ahead with sync putandGetKey.
- // Need to work on this during cedar
- // if (this.keyPutNoSync) {
- // putAndGetKeyNoSync(event);
- // }
- // else {
- // synchronized (this) {
putAndGetKey(event);
return true;
- // }
- // }
}
return false;
}
@@ -366,26 +353,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// If we do want to support it then each caller needs
// to call freeOffHeapResources and the returned GatewaySenderEventImpl
throw new UnsupportedOperationException();
- // resetLastPeeked();
- // AsyncEvent object = peekAhead();
- // // If it is not null, destroy it and increment the head key
- // if (object != null) {
- // Long key = this.peekedIds.remove();
- // if (logger.isTraceEnabled()) {
- // logger.trace("{}: Retrieved {} -> {}",this, key, object);
- // }
- // // Remove the entry at that key with a callback arg signifying it is
- // // a WAN queue so that AbstractRegionEntry.destroy can get the value
- // // even if it has been evicted to disk. In the normal case, the
- // // AbstractRegionEntry.destroy only gets the value in the VM.
- // this.region.destroy(key, RegionQueue.WAN_QUEUE_TOKEN);
- // updateHeadKey(key.longValue());
-
- // if (logger.isTraceEnabled()) {
- // logger.trace("{}: Destroyed {} -> {}", this, key, object);
- // }
- // }
- // return object;
}
public List<AsyncEvent> take(int batchSize) throws CacheException {
@@ -393,20 +360,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// If we do want to support it then the callers
// need to call freeOffHeapResources on each returned GatewaySenderEventImpl
throw new UnsupportedOperationException();
- // List<AsyncEvent> batch = new ArrayList<AsyncEvent>(
- // batchSize * 2);
- // for (int i = 0; i < batchSize; i++) {
- // AsyncEvent obj = take();
- // if (obj != null) {
- // batch.add(obj);
- // } else {
- // break;
- // }
- // }
- // if (logger.isTraceEnabled()) {
- // logger.trace("{}: Took a batch of {} entries", this, batch.size());
- // }
- // return batch;
}
/**
@@ -442,7 +395,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
this.lastDispatchedKey = key;
if (wasEmpty) {
- this.notify();
+ notifyAll();
}
if (logger.isDebugEnabled()) {
@@ -468,7 +421,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
public Object peek() throws CacheException {
- // resetLastPeeked();
Object object = peekAhead();
if (logger.isTraceEnabled()) {
logger.trace("{}: Peeked {} -> {}", this, peekedIds, object);
@@ -494,7 +446,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
}
List<AsyncEvent> batch = new ArrayList<AsyncEvent>(size * 2); // why
// *2?
- // resetLastPeeked();
while (batch.size() < size) {
AsyncEvent object = peekAhead();
// Conflate here
@@ -725,7 +676,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
/**
* Clear the list of peeked keys. The next peek will start again at the head key.
- *
*/
public void resetLastPeeked() {
this.peekedIds.clear();
@@ -736,7 +686,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*
* @throws CacheException
*/
-
private Long getCurrentKey() {
long currentKey;
if (this.peekedIds.isEmpty()) {
@@ -775,7 +724,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
return null;
}
-
// It's important here that we check where the current key
// is in relation to the tail key before we check to see if the
// object exists. The reason is that the tail key is basically
@@ -785,7 +733,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// If we check for the object, and then check the tail key, we could
// skip objects.
- // @todo don't do a get which updates the lru, instead just get the value
+ // TODO: don't do a get which updates the lru, instead just get the value
// w/o modifying the LRU.
// Note: getting the serialized form here (if it has overflowed to disk)
// does not save anything since GatewayBatchOp needs to GatewayEventImpl
@@ -969,7 +917,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void initializeRegion(AbstractGatewaySender sender, CacheListener listener) {
- final GemFireCacheImpl gemCache = (GemFireCacheImpl) sender.getCache();
+ final InternalCache gemCache = sender.getCache();
this.region = gemCache.getRegion(this.regionName);
if (this.region == null) {
AttributesFactory<Long, AsyncEvent> factory = new AttributesFactory<Long, AsyncEvent>();
@@ -992,11 +940,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
factory.setEvictionAttributes(ea);
factory.setConcurrencyChecksEnabled(false);
-
factory.setDiskStoreName(this.diskStoreName);
- // TODO: Suranjan, can we do the following
- // In case of persistence write to disk sync and in case of eviction
- // write in async
+
+ // In case of persistence write to disk sync and in case of eviction write in async
factory.setDiskSynchronous(this.isDiskSynchronous);
// Create the region
@@ -1067,12 +1013,14 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private volatile boolean shutdown = false;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
/**
* Constructor : Creates and initializes the thread
+ *
+ * @param c
*/
- public BatchRemovalThread(GemFireCacheImpl c) {
+ public BatchRemovalThread(InternalCache c) {
this.setDaemon(true);
this.cache = c;
}
@@ -1213,7 +1161,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
AbstractGatewaySender sender = null;
protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs,
- LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender sender) {
+ LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null)