You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by su...@apache.org on 2015/11/02 06:43:34 UTC
incubator-geode git commit: WAN SingleHop Implementation
Repository: incubator-geode
Updated Branches:
refs/heads/feature/wan_single_hop_wip aaa5e22eb -> 37a9ee5f5
WAN SingleHop Implementation
Follow up checkins for fixes of issues related SerialGatewaySender,ClassCastException and NPE
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/37a9ee5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/37a9ee5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/37a9ee5f
Branch: refs/heads/feature/wan_single_hop_wip
Commit: 37a9ee5f528aefe5ae63743b80c7cb399bb7b768
Parents: aaa5e22
Author: Suranjan Kumar <sk...@pivotal.io>
Authored: Mon Nov 2 10:56:17 2015 +0530
Committer: Suranjan Kumar <sk...@pivotal.io>
Committed: Mon Nov 2 10:56:17 2015 +0530
----------------------------------------------------------------------
.../gemfire/internal/cache/BucketAdvisor.java | 2 +-
.../cache/partitioned/RegionAdvisor.java | 107 +++++++------
.../sockets/command/GatewayReceiverCommand.java | 46 +++---
.../AbstractGatewaySenderEventProcessor.java | 78 ++++-----
.../ParallelGatewaySenderEventProcessor.java | 42 ++---
.../parallel/ParallelGatewaySenderQueue.java | 160 +++++++++----------
6 files changed, 227 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 9fff9da..64d340f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1794,7 +1794,7 @@ public final class BucketAdvisor extends CacheDistributionAdvisor {
if (!this.pRegion.isShadowPR()) {
GemFireCacheImpl c = getProxyBucketRegion().getCache();
List servers = null;
- servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
+ //servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
servers = c.getCacheServersAndGatewayReceiver();
HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
index 0f9ceb6..8ba3d1f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractSet;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +46,7 @@ import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.InterestPolicy;
import com.gemstone.gemfire.cache.LowMemoryException;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.ProfileListener;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -306,76 +308,91 @@ public class RegionAdvisor extends CacheDistributionAdvisor
/**
* get the bucket to primary serverlocation map.
+ *
* @return
*/
-
+
public Map<Integer, BucketServerLocation66> getAllPrimaryClientBucketProfiles() {
- Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
+ Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
for (Integer bucketId : this.clientBucketProfilesMap.keySet()) {
for (BucketProfile profile : this.clientBucketProfilesMap.get(bucketId)) {
- ServerBucketProfile cProfile = (ServerBucketProfile)profile;
- Set<BucketServerLocation66> bucketServerLocations = cProfile.getBucketServerLocations();
- for(BucketServerLocation66 loc: bucketServerLocations) {
- if(loc.isPrimary()) {
- bucketToPrimaryServerLocation.put(bucketId, loc);
- }
- }
+ ServerBucketProfile cProfile = (ServerBucketProfile) profile;
+ Set<BucketServerLocation66> bucketServerLocations = cProfile
+ .getBucketServerLocations();
+ for (BucketServerLocation66 loc : bucketServerLocations) {
+ if (loc.isPrimary()) {
+ bucketToPrimaryServerLocation.put(bucketId, loc);
+ }
+ }
}
}
-
+
if (getPartitionedRegion().isDataStore()) {
- for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
+ for (Integer bucketId : getPartitionedRegion().getDataStore()
+ .getAllLocalBucketIds()) {
BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
if (logger.isDebugEnabled()) {
logger.debug("The local profile is : {}", profile);
}
-
+
if (profile != null) {
if (profile instanceof ServerBucketProfile) {
- ServerBucketProfile cProfile = (ServerBucketProfile)profile;
+ ServerBucketProfile cProfile = (ServerBucketProfile) profile;
Set<BucketServerLocation66> bucketServerLocations = cProfile
.getBucketServerLocations();
-
- for(BucketServerLocation66 loc: bucketServerLocations) {
- if(loc.isPrimary()) {
- bucketToPrimaryServerLocation.put(bucketId, loc);
- }
+
+ for (BucketServerLocation66 loc : bucketServerLocations) {
+ if (loc.isPrimary()) {
+ bucketToPrimaryServerLocation.put(bucketId, loc);
+ }
}
}
}
}
}
if (logger.isDebugEnabled()) {
- logger.debug("The bucket to primary serverlocation is : {}", bucketToPrimaryServerLocation);
- }
+ logger.debug("The bucket to primary serverlocation is : {}",
+ bucketToPrimaryServerLocation);
+ }
return bucketToPrimaryServerLocation;
}
-
- public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
- Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
-
- for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
- .values()) {
- if (recieverToPrimaryBucketMap.containsKey(location)) {
- Set<Integer> l = recieverToPrimaryBucketMap.get(location);
- l.add(location.getBucketId());
- } else {
- Set<Integer> bucketList = new HashSet<Integer>();
- bucketList.add(location.getBucketId());
- recieverToPrimaryBucketMap.put(location, bucketList);
- }
- }
-
- Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
-
- for(BucketServerLocation66 bl: recieverToPrimaryBucketMap.keySet()) {
- recieverToPrimaryBucketMapCopy.put(
- new ServerLocation(bl.getHostName(), bl.getPort()),
- recieverToPrimaryBucketMap.get(bl));
- }
- return recieverToPrimaryBucketMapCopy;
- }
+
+ public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
+ Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
+
+ for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
+ .values()) {
+ if (recieverToPrimaryBucketMap.containsKey(location)) {
+ Set<Integer> l = recieverToPrimaryBucketMap.get(location);
+ l.add(location.getBucketId());
+ } else {
+ Set<Integer> bucketList = new HashSet<Integer>();
+ bucketList.add(location.getBucketId());
+ recieverToPrimaryBucketMap.put(location, bucketList);
+ }
+ }
+
+ Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
+
+ // GemFireCacheImpl c = getPartitionedRegion().getCache();
+ // Set<GatewayReceiver> s = c.getGatewayReceivers();
+
+ for (BucketServerLocation66 bl : recieverToPrimaryBucketMap.keySet()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The servergroup is : {}", bl.getServerGroups());
+ }
+ List<String> allgroups = Arrays.asList(bl.getServerGroups());
+ boolean contains = allgroups.contains(GatewayReceiver.RECEIVER_GROUP);
+
+ if (contains) {
+ recieverToPrimaryBucketMapCopy.put(
+ new ServerLocation(bl.getHostName(), bl.getPort()),
+ recieverToPrimaryBucketMap.get(bl));
+ }
+ }
+ return recieverToPrimaryBucketMapCopy;
+ }
public ConcurrentHashMap<Integer, Set<ServerBucketProfile>> getAllClientBucketProfilesTest() {
ConcurrentHashMap<Integer, Set<ServerBucketProfile>> map = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 9334d4f..f39b9ba 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -754,29 +754,31 @@ public class GatewayReceiverCommand extends BaseCommand {
replyMsg.setTransactionId(msg.getTransactionId());
boolean nwhop = false;
- if(region instanceof PartitionedRegion ) {
- PartitionedRegion pr = (PartitionedRegion)region;
- nwhop = pr.isNetworkHop().byteValue() != (byte)0;
+ if (region instanceof PartitionedRegion) {
+ PartitionedRegion pr = (PartitionedRegion) region;
+ nwhop = pr.isNetworkHop().byteValue() != (byte) 0;
}
- if (nwhop) {
- PartitionedRegion pr = (PartitionedRegion)region;
- Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
- .getRegionAdvisor().getAllPrimaryBucketLocations();
- if (logger.isDebugEnabled()) {
- logger.debug("Replying with the primary locations as nwhop occurred {}", recieverToPrimaryBucketMap);
- }
- replyMsg.setNumberOfParts(3);
- replyMsg.addIntPart(batchId);
- replyMsg.addIntPart(numberOfEvents);
- replyMsg.addObjPart(recieverToPrimaryBucketMap);
- }
- else{
- if (logger.isDebugEnabled()) {
- logger.debug("Not Replying with the primary locations as no nwhop occurred");
- }
- replyMsg.setNumberOfParts(2);
- replyMsg.addIntPart(batchId);
- replyMsg.addIntPart(numberOfEvents);
+ if (nwhop && (servConn.getClientVersion() != null)
+ && (servConn.getClientVersion().compareTo(Version.GFE_90) >= 0)) {
+ PartitionedRegion pr = (PartitionedRegion) region;
+ Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
+ .getRegionAdvisor().getAllPrimaryBucketLocations();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Replying with the primary locations as nwhop occurred {}",
+ recieverToPrimaryBucketMap);
+ }
+ replyMsg.setNumberOfParts(3);
+ replyMsg.addIntPart(batchId);
+ replyMsg.addIntPart(numberOfEvents);
+ replyMsg.addObjPart(recieverToPrimaryBucketMap);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Not Replying with the primary locations as no nwhop occurred");
+ }
+ replyMsg.setNumberOfParts(2);
+ replyMsg.addIntPart(batchId);
+ replyMsg.addIntPart(numberOfEvents);
}
replyMsg.setTransactionId(msg.getTransactionId());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e6e9207..1886811 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -491,12 +491,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
}*/
}
- if(this.resumeWhenPeekedEventsEmpty) {
- if(this.queue instanceof ParallelGatewaySenderQueue) {
- events = ((ParallelGatewaySenderQueue)this.queue).peekAlreadyPeekedEvents();
- }
- }else{
- events = this.queue.peek(batchSize, batchTimeInterval);
+ if (this.resumeWhenPeekedEventsEmpty) {
+ if (this.queue instanceof ParallelGatewaySenderQueue) {
+ events = ((ParallelGatewaySenderQueue) this.queue)
+ .peekAlreadyPeekedEvents();
+ }
+ } else {
+ events = this.queue.peek(batchSize, batchTimeInterval);
}
} catch (InterruptedException e) {
interrupted = true;
@@ -1377,37 +1378,36 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
}
- public boolean isPeekedEventEmpty() {
- if (logger.isDebugEnabled()) {
- logger.debug("SKSKSK Getting called for this processor " + this);
- }
- BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
- .getPeekedEvents();
- if (logger.isDebugEnabled()) {
- logger.debug("SKSKSK the peeked evets are for this processor "
- + peekedEvents);
-
- }
-
-// synchronized (peekedEvents) {
-// while(!peekedEvents.isEmpty()) {
-// try {
-// peekedEvents.wait();
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-// }
-// }
-
- if (logger.isDebugEnabled()) {
- logger.debug("SKSKSK WAIT COMPLETE "
- + peekedEvents);
-
- }
- return peekedEvents.isEmpty();
- }
-
- public void markResumeWhenPeekedEventEmpty(boolean flag) {
- this.resumeWhenPeekedEventsEmpty = flag;
- }
+ public boolean isPeekedEventEmpty() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("SKSKSK Getting called for this processor " + this);
+ }
+ BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
+ .getPeekedEvents();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "SKSKSK the peeked evets are for this processor " + peekedEvents);
+
+ }
+
+ // synchronized (peekedEvents) {
+ // while(!peekedEvents.isEmpty()) {
+ // try {
+ // peekedEvents.wait();
+ // } catch (InterruptedException e) {
+ // e.printStackTrace();
+ // }
+ // }
+ // }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("SKSKSK WAIT COMPLETE " + peekedEvents);
+
+ }
+ return peekedEvents.isEmpty();
+ }
+
+ public void markResumeWhenPeekedEventEmpty(boolean flag) {
+ this.resumeWhenPeekedEventsEmpty = flag;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 9bc3ae8..2181114 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
@@ -51,6 +52,7 @@ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.internal.size.SingleObjectSizer;
import java.io.IOException;
@@ -82,23 +84,27 @@ public class ParallelGatewaySenderEventProcessor extends
initializeMessageQueue(sender.getId());
setDaemon(true);
}
-
- public synchronized void setBuckets(Set<Integer> s) {
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderEventProcessor before setting: {} , {}", s, this.buckets);
- }
- this.buckets.clear();
- this.buckets.addAll(s);
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderEventProcessor after settingn: {} , {}", s, this.buckets);
- }
-
- ((ParallelGatewaySenderQueue)this.queue).setBuckets(this.buckets);
- ((ParallelGatewaySenderQueue)this.queue).setSingleHop();
+
+ public synchronized void setBuckets(Set<Integer> s) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "ParallelGatewaySenderEventProcessor before setting: {} , {}", s,
+ this.buckets);
+ }
+ this.buckets.clear();
+ this.buckets.addAll(s);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "ParallelGatewaySenderEventProcessor after settingn: {} , {}", s,
+ this.buckets);
+ }
+
+ ((ParallelGatewaySenderQueue) this.queue).setBuckets(this.buckets);
+ ((ParallelGatewaySenderQueue) this.queue).setSingleHop();
}
public Set<Integer> getBuckets() {
- return this.buckets;
+ return this.buckets;
}
/**
* use in concurrent scenario where queue is to be shared among all the processors.
@@ -109,10 +115,8 @@ public class ParallelGatewaySenderEventProcessor extends
"Event Processor for GatewaySender_" + sender.getId()+"_"+ id, sender);
this.index = id;
this.nDispatcher = nDispatcher;
- //this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
- //SURANJAN for the time being assume 113, it should be totalnumbuckets in the region
this.buckets = new HashSet<Integer>();
- for(int i=index;i< 113; i = i+nDispatcher) {
+ for (int i = index; i < PartitionAttributesFactory.GLOBAL_MAX_BUCKETS_DEFAULT; i = i + nDispatcher) {
this.buckets.add(i);
}
initializeMessageQueue(sender.getId());
@@ -269,8 +273,6 @@ public class ParallelGatewaySenderEventProcessor extends
@Override
public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
- this.sender.setPrimaryLocations(locations);
+ this.sender.setPrimaryLocations(locations);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 170a965..437445b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1010,25 +1010,27 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
BucketRegion bucket = bucketEntry.getValue();
if (bucket.getBucketAdvisor().isPrimary()) {
int bId = bucket.getId();
- if(this.buckets.isEmpty() && !singleHop){
- if (bId % this.nDispatcher == this.index) {
- thisProcessorBuckets.add(bId);
- }
- }else {
- if(this.buckets.contains(bId)) {
- thisProcessorBuckets.add(bId);
- }
+ if (this.buckets.isEmpty() && !singleHop) {
+ if (bId % this.nDispatcher == this.index) {
+ thisProcessorBuckets.add(bId);
+ }
+ } else {
+ if (this.buckets.contains(bId)) {
+ thisProcessorBuckets.add(bId);
+ }
}
}
}
-
+
if (logger.isDebugEnabled()) {
- logger.debug("getRandomPrimaryBucket: allocated {} for this processor", this.buckets);
- }
-
+ logger.debug("getRandomPrimaryBucket: allocated {} for this processor",
+ this.buckets);
+ }
+
if (logger.isDebugEnabled()) {
- logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), thisProcessorBuckets.size());
- }
+ logger.debug("getRandomPrimaryBucket: total {} for this processor: {}",
+ allBuckets.size(), thisProcessorBuckets.size());
+ }
int nTry = thisProcessorBuckets.size();
@@ -1057,18 +1059,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return -1;
}
- public Set<Integer> getAllBucket() {
- PartitionedRegion prQ = getRandomShadowPR();
-
- if (prQ != null) {
- Set<Integer> s = new HashSet<Integer>();
- for(int i=0;i<prQ.getTotalNumberOfBuckets(); i++) {
- s.add(i);
- }
- return s;
- }
- return Collections.EMPTY_SET;
- }
+ public Set<Integer> getAllBucket() {
+ PartitionedRegion prQ = getRandomShadowPR();
+
+ if (prQ != null) {
+ Set<Integer> s = new HashSet<Integer>();
+ for (int i = 0; i < prQ.getTotalNumberOfBuckets(); i++) {
+ s.add(i);
+ }
+ return s;
+ }
+ return Collections.EMPTY_SET;
+ }
@Override
public List take(int batchSize) throws CacheException, InterruptedException {
@@ -1127,14 +1129,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} finally {
event.release();
}
+ } else {
+ synchronized (peekedEvents) {
+ if (peekedEvents.isEmpty()) {
+ peekedEvents.notifyAll();
+ }
+ }
}
- else {
- synchronized (peekedEvents) {
- if (peekedEvents.isEmpty()) {
- peekedEvents.notifyAll();
- }
- }
- }
}
private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId,
@@ -1285,35 +1286,32 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
throw new UnsupportedOperationException();
}
- public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
- List batch = new ArrayList();
- if (this.resetLastPeeked) {
- batch.addAll(peekedEvents);
- this.resetLastPeeked = false;
- final boolean isDebugEnabled = logger.isDebugEnabled();
-
- if (isDebugEnabled) {
- StringBuffer buffer = new StringBuffer();
- for (GatewaySenderEventImpl ge : peekedEvents) {
- buffer.append("event :");
- buffer.append(ge);
- }
- logger.debug("Adding already peeked events to the batch {}",
- buffer);
- }
- }
- else {
- // ideally block
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-
- return batch;
- }
+ public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
+ List batch = new ArrayList();
+ if (this.resetLastPeeked) {
+ batch.addAll(peekedEvents);
+ this.resetLastPeeked = false;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ if (isDebugEnabled) {
+ StringBuffer buffer = new StringBuffer();
+ for (GatewaySenderEventImpl ge : peekedEvents) {
+ buffer.append("event :");
+ buffer.append(ge);
+ }
+ logger.debug("Adding already peeked events to the batch {}", buffer);
+ }
+ } else {
+ // ideally block
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ return batch;
+ }
public List peek(int batchSize, int timeToWait) throws InterruptedException,
CacheException {
@@ -1550,11 +1548,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
for (int i = 0; i < batchSize; i++) {
remove();
}
- synchronized (peekedEvents) {
- if (peekedEvents.isEmpty()) {
- peekedEvents.notifyAll();
- }
- }
+ synchronized (peekedEvents) {
+ if (peekedEvents.isEmpty()) {
+ peekedEvents.notifyAll();
+ }
+ }
}
public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
@@ -1944,21 +1942,21 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
}
- public void setBuckets(Set<Integer> buckets2) {
- this.buckets = buckets2;
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2, this.buckets);
- }
-
- }
+ public void setBuckets(Set<Integer> buckets2) {
+ this.buckets = buckets2;
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2,
+ this.buckets);
+ }
+
+ }
- public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
- return peekedEvents;
- }
+ public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
+ return peekedEvents;
+ }
- public void setSingleHop() {
- this.singleHop = true;
- }
+ public void setSingleHop() {
+ this.singleHop = true;
+ }
-
}