You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by su...@apache.org on 2015/11/02 06:43:34 UTC

incubator-geode git commit: WAN SingleHop Implementation

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/wan_single_hop_wip aaa5e22eb -> 37a9ee5f5


  WAN SingleHop Implementation

  Follow up checkins for fixes of issues related SerialGatewaySender,ClassCastException and NPE


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/37a9ee5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/37a9ee5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/37a9ee5f

Branch: refs/heads/feature/wan_single_hop_wip
Commit: 37a9ee5f528aefe5ae63743b80c7cb399bb7b768
Parents: aaa5e22
Author: Suranjan Kumar <sk...@pivotal.io>
Authored: Mon Nov 2 10:56:17 2015 +0530
Committer: Suranjan Kumar <sk...@pivotal.io>
Committed: Mon Nov 2 10:56:17 2015 +0530

----------------------------------------------------------------------
 .../gemfire/internal/cache/BucketAdvisor.java   |   2 +-
 .../cache/partitioned/RegionAdvisor.java        | 107 +++++++------
 .../sockets/command/GatewayReceiverCommand.java |  46 +++---
 .../AbstractGatewaySenderEventProcessor.java    |  78 ++++-----
 .../ParallelGatewaySenderEventProcessor.java    |  42 ++---
 .../parallel/ParallelGatewaySenderQueue.java    | 160 +++++++++----------
 6 files changed, 227 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 9fff9da..64d340f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1794,7 +1794,7 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
     if (!this.pRegion.isShadowPR()) {
       GemFireCacheImpl c = getProxyBucketRegion().getCache();
       List servers = null;
-      servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
+      //servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
       servers = c.getCacheServersAndGatewayReceiver();
 
       HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
index 0f9ceb6..8ba3d1f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.AbstractSet;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.InterestPolicy;
 import com.gemstone.gemfire.cache.LowMemoryException;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.ProfileListener;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -306,76 +308,91 @@ public class RegionAdvisor extends CacheDistributionAdvisor
   
   /**
    * get the bucket to primary serverlocation map.
+   * 
    * @return
    */
-  
+
   public Map<Integer, BucketServerLocation66> getAllPrimaryClientBucketProfiles() {
-	Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
+    Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
     for (Integer bucketId : this.clientBucketProfilesMap.keySet()) {
       for (BucketProfile profile : this.clientBucketProfilesMap.get(bucketId)) {
-	      ServerBucketProfile cProfile = (ServerBucketProfile)profile;
-	      Set<BucketServerLocation66> bucketServerLocations = cProfile.getBucketServerLocations();
-	      for(BucketServerLocation66 loc: bucketServerLocations) {
-	    	  if(loc.isPrimary()) {
-	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
-	    	  }
-	      }
+        ServerBucketProfile cProfile = (ServerBucketProfile) profile;
+        Set<BucketServerLocation66> bucketServerLocations = cProfile
+            .getBucketServerLocations();
+        for (BucketServerLocation66 loc : bucketServerLocations) {
+          if (loc.isPrimary()) {
+            bucketToPrimaryServerLocation.put(bucketId, loc);
+          }
+        }
       }
     }
-    
+
     if (getPartitionedRegion().isDataStore()) {
-      for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
+      for (Integer bucketId : getPartitionedRegion().getDataStore()
+          .getAllLocalBucketIds()) {
         BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
 
         if (logger.isDebugEnabled()) {
           logger.debug("The local profile is : {}", profile);
         }
-        
+
         if (profile != null) {
           if (profile instanceof ServerBucketProfile) {
-            ServerBucketProfile cProfile = (ServerBucketProfile)profile;
+            ServerBucketProfile cProfile = (ServerBucketProfile) profile;
             Set<BucketServerLocation66> bucketServerLocations = cProfile
                 .getBucketServerLocations();
-            
-            for(BucketServerLocation66 loc: bucketServerLocations) {
-            	if(loc.isPrimary()) {
-  	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
-  	    	  }
+
+            for (BucketServerLocation66 loc : bucketServerLocations) {
+              if (loc.isPrimary()) {
+                bucketToPrimaryServerLocation.put(bucketId, loc);
+              }
             }
           }
         }
       }
     }
     if (logger.isDebugEnabled()) {
-        logger.debug("The bucket to primary serverlocation is : {}", bucketToPrimaryServerLocation);
-      }
+      logger.debug("The bucket to primary serverlocation is : {}",
+          bucketToPrimaryServerLocation);
+    }
     return bucketToPrimaryServerLocation;
   }
-  
-	public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
-		Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
-
-		for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
-				.values()) {
-			if (recieverToPrimaryBucketMap.containsKey(location)) {
-				Set<Integer> l = recieverToPrimaryBucketMap.get(location);
-				l.add(location.getBucketId());
-			} else {
-				Set<Integer> bucketList = new HashSet<Integer>();
-				bucketList.add(location.getBucketId());
-				recieverToPrimaryBucketMap.put(location, bucketList);
-			}
-		}
-		
-		Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
-		
-		for(BucketServerLocation66 bl: recieverToPrimaryBucketMap.keySet()) {
-			recieverToPrimaryBucketMapCopy.put(
-					new ServerLocation(bl.getHostName(), bl.getPort()),
-					recieverToPrimaryBucketMap.get(bl));
-		}
-		return recieverToPrimaryBucketMapCopy;
-	}
+
+  public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
+    Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
+
+    for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
+        .values()) {
+      if (recieverToPrimaryBucketMap.containsKey(location)) {
+        Set<Integer> l = recieverToPrimaryBucketMap.get(location);
+        l.add(location.getBucketId());
+      } else {
+        Set<Integer> bucketList = new HashSet<Integer>();
+        bucketList.add(location.getBucketId());
+        recieverToPrimaryBucketMap.put(location, bucketList);
+      }
+    }
+
+    Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
+    
+   // GemFireCacheImpl c = getPartitionedRegion().getCache();
+   // Set<GatewayReceiver> s = c.getGatewayReceivers();
+    
+    for (BucketServerLocation66 bl : recieverToPrimaryBucketMap.keySet()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("The servergroup is : {}",  bl.getServerGroups());
+      }
+      List<String> allgroups = Arrays.asList(bl.getServerGroups());
+      boolean contains = allgroups.contains(GatewayReceiver.RECEIVER_GROUP);
+      
+      if (contains) {
+        recieverToPrimaryBucketMapCopy.put(
+            new ServerLocation(bl.getHostName(), bl.getPort()),
+            recieverToPrimaryBucketMap.get(bl));
+      }
+    }
+    return recieverToPrimaryBucketMapCopy;
+  }
   
   public ConcurrentHashMap<Integer, Set<ServerBucketProfile>> getAllClientBucketProfilesTest() {
     ConcurrentHashMap<Integer, Set<ServerBucketProfile>> map = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 9334d4f..f39b9ba 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -754,29 +754,31 @@ public class GatewayReceiverCommand extends BaseCommand {
     replyMsg.setTransactionId(msg.getTransactionId());
     
     boolean nwhop = false;
-    if(region instanceof PartitionedRegion ) {
-    	PartitionedRegion pr = (PartitionedRegion)region;
-    	nwhop = pr.isNetworkHop().byteValue() != (byte)0;
+    if (region instanceof PartitionedRegion) {
+      PartitionedRegion pr = (PartitionedRegion) region;
+      nwhop = pr.isNetworkHop().byteValue() != (byte) 0;
     }
-    if (nwhop) {
-    	PartitionedRegion pr = (PartitionedRegion)region;
-    	Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
-    	        .getRegionAdvisor().getAllPrimaryBucketLocations();
-    	if (logger.isDebugEnabled()) {
-    	    logger.debug("Replying with the primary locations as nwhop occurred {}", recieverToPrimaryBucketMap);
-    	}
-        replyMsg.setNumberOfParts(3);
-        replyMsg.addIntPart(batchId);
-        replyMsg.addIntPart(numberOfEvents);
-        replyMsg.addObjPart(recieverToPrimaryBucketMap);
-    }
-    else{
-    	if (logger.isDebugEnabled()) {
-    	    logger.debug("Not Replying with the primary locations as no nwhop occurred");
-    	}
-    	replyMsg.setNumberOfParts(2);
-        replyMsg.addIntPart(batchId);
-        replyMsg.addIntPart(numberOfEvents);
+    if (nwhop && (servConn.getClientVersion() != null)
+        && (servConn.getClientVersion().compareTo(Version.GFE_90) >= 0)) {
+      PartitionedRegion pr = (PartitionedRegion) region;
+      Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
+          .getRegionAdvisor().getAllPrimaryBucketLocations();
+      if (logger.isDebugEnabled()) {
+        logger.debug("Replying with the primary locations as nwhop occurred {}",
+            recieverToPrimaryBucketMap);
+      }
+      replyMsg.setNumberOfParts(3);
+      replyMsg.addIntPart(batchId);
+      replyMsg.addIntPart(numberOfEvents);
+      replyMsg.addObjPart(recieverToPrimaryBucketMap);
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Not Replying with the primary locations as no nwhop occurred");
+      }
+      replyMsg.setNumberOfParts(2);
+      replyMsg.addIntPart(batchId);
+      replyMsg.addIntPart(numberOfEvents);
     }
     
     replyMsg.setTransactionId(msg.getTransactionId());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e6e9207..1886811 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -491,12 +491,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
               }
             }*/
             }
-            if(this.resumeWhenPeekedEventsEmpty) {
-            	if(this.queue instanceof ParallelGatewaySenderQueue) {
-            		events = ((ParallelGatewaySenderQueue)this.queue).peekAlreadyPeekedEvents();
-            	}
-            }else{
-            	events = this.queue.peek(batchSize, batchTimeInterval);
+            if (this.resumeWhenPeekedEventsEmpty) {
+              if (this.queue instanceof ParallelGatewaySenderQueue) {
+                events = ((ParallelGatewaySenderQueue) this.queue)
+                    .peekAlreadyPeekedEvents();
+              }
+            } else {
+              events = this.queue.peek(batchSize, batchTimeInterval);
             }
           } catch (InterruptedException e) {
             interrupted = true;
@@ -1377,37 +1378,36 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
   public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
   }
 
-	public boolean isPeekedEventEmpty() {
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK Getting called for this processor  " + this);
-		}
-		BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
-				.getPeekedEvents();
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK the peeked evets are for this processor  "
-					+ peekedEvents);
-			
-		}
-		
-//		synchronized (peekedEvents) {
-//			while(!peekedEvents.isEmpty()) {
-//				try {
-//					peekedEvents.wait();
-//				} catch (InterruptedException e) {
-//					e.printStackTrace();
-//				}
-//			}
-//		}
-		
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK WAIT COMPLETE  "
-					+ peekedEvents);
-			
-		}
-		return peekedEvents.isEmpty();
-	}
-
-	public void markResumeWhenPeekedEventEmpty(boolean flag) {
-		this.resumeWhenPeekedEventsEmpty = flag;
-	}   
+  public boolean isPeekedEventEmpty() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("SKSKSK Getting called for this processor  " + this);
+    }
+    BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
+        .getPeekedEvents();
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "SKSKSK the peeked evets are for this processor  " + peekedEvents);
+
+    }
+
+    // synchronized (peekedEvents) {
+    // while(!peekedEvents.isEmpty()) {
+    // try {
+    // peekedEvents.wait();
+    // } catch (InterruptedException e) {
+    // e.printStackTrace();
+    // }
+    // }
+    // }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("SKSKSK WAIT COMPLETE  " + peekedEvents);
+
+    }
+    return peekedEvents.isEmpty();
+  }
+
+  public void markResumeWhenPeekedEventEmpty(boolean flag) {
+    this.resumeWhenPeekedEventsEmpty = flag;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 9bc3ae8..2181114 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
@@ -51,6 +52,7 @@ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
 import com.gemstone.gemfire.internal.size.SingleObjectSizer;
 
 import java.io.IOException;
@@ -82,23 +84,27 @@ public class ParallelGatewaySenderEventProcessor extends
     initializeMessageQueue(sender.getId());
     setDaemon(true);
   }
-  
-  public synchronized void  setBuckets(Set<Integer> s) {
-	  if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderEventProcessor before setting: {} , {}", s, this.buckets);
-	  }
-	  this.buckets.clear();
-	  this.buckets.addAll(s);
-	  if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderEventProcessor after settingn: {} , {}", s, this.buckets);
-	  }
-	  
-	  ((ParallelGatewaySenderQueue)this.queue).setBuckets(this.buckets);
-	  ((ParallelGatewaySenderQueue)this.queue).setSingleHop();
+
+  public synchronized void setBuckets(Set<Integer> s) {
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "ParallelGatewaySenderEventProcessor before setting: {} , {}", s,
+          this.buckets);
+    }
+    this.buckets.clear();
+    this.buckets.addAll(s);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "ParallelGatewaySenderEventProcessor after settingn: {} , {}", s,
+          this.buckets);
+    }
+
+    ((ParallelGatewaySenderQueue) this.queue).setBuckets(this.buckets);
+    ((ParallelGatewaySenderQueue) this.queue).setSingleHop();
   }
   
   public Set<Integer> getBuckets() {
-	  return this.buckets;
+    return this.buckets;
   }
   /**
    * use in concurrent scenario where queue is to be shared among all the processors.
@@ -109,10 +115,8 @@ public class ParallelGatewaySenderEventProcessor extends
         "Event Processor for GatewaySender_" + sender.getId()+"_"+ id, sender);
     this.index = id;
     this.nDispatcher = nDispatcher;
-    //this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
-    //SURANJAN for the time being assume 113, it should be totalnumbuckets in the region
     this.buckets = new HashSet<Integer>();
-    for(int i=index;i< 113; i = i+nDispatcher) {
+    for (int i = index; i < PartitionAttributesFactory.GLOBAL_MAX_BUCKETS_DEFAULT; i = i + nDispatcher) {
       this.buckets.add(i);
     }
     initializeMessageQueue(sender.getId());
@@ -269,8 +273,6 @@ public class ParallelGatewaySenderEventProcessor extends
 
   @Override
   public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
-	  this.sender.setPrimaryLocations(locations);
+    this.sender.setPrimaryLocations(locations);
   }
-  
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37a9ee5f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 170a965..437445b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1010,25 +1010,27 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         BucketRegion bucket = bucketEntry.getValue();
         if (bucket.getBucketAdvisor().isPrimary()) {
           int bId = bucket.getId();
-          if(this.buckets.isEmpty() && !singleHop){
-        	if (bId % this.nDispatcher == this.index) {
-        	  thisProcessorBuckets.add(bId);
-        	}
-          }else {
-        	  if(this.buckets.contains(bId)) {
-        		thisProcessorBuckets.add(bId);
-              }
+          if (this.buckets.isEmpty() && !singleHop) {
+            if (bId % this.nDispatcher == this.index) {
+              thisProcessorBuckets.add(bId);
+            }
+          } else {
+            if (this.buckets.contains(bId)) {
+              thisProcessorBuckets.add(bId);
+            }
           }
         }
       }
-      
+
       if (logger.isDebugEnabled()) {
-          logger.debug("getRandomPrimaryBucket: allocated {} for this processor", this.buckets);
-        }      
-      
+        logger.debug("getRandomPrimaryBucket: allocated {} for this processor",
+            this.buckets);
+      }
+
       if (logger.isDebugEnabled()) {
-        logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), thisProcessorBuckets.size());
-      }           
+        logger.debug("getRandomPrimaryBucket: total {} for this processor: {}",
+            allBuckets.size(), thisProcessorBuckets.size());
+      }          
       
       int nTry =  thisProcessorBuckets.size();
       
@@ -1057,18 +1059,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return -1;
   }
   
-	public Set<Integer> getAllBucket() {
-		PartitionedRegion prQ = getRandomShadowPR();
-		
-		if (prQ != null) {
-			Set<Integer> s = new HashSet<Integer>();
-			for(int i=0;i<prQ.getTotalNumberOfBuckets(); i++) {
-				s.add(i);
-			}
-			return s;
-		}
-		return Collections.EMPTY_SET;
-	}
+  public Set<Integer> getAllBucket() {
+    PartitionedRegion prQ = getRandomShadowPR();
+
+    if (prQ != null) {
+      Set<Integer> s = new HashSet<Integer>();
+      for (int i = 0; i < prQ.getTotalNumberOfBuckets(); i++) {
+        s.add(i);
+      }
+      return s;
+    }
+    return Collections.EMPTY_SET;
+  }
   
   @Override
   public List take(int batchSize) throws CacheException, InterruptedException {
@@ -1127,14 +1129,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       } finally {
         event.release();
       }
+    } else {
+      synchronized (peekedEvents) {
+        if (peekedEvents.isEmpty()) {
+          peekedEvents.notifyAll();
+        }
+      }
     }
-    else {
-			synchronized (peekedEvents) {
-				if (peekedEvents.isEmpty()) {
-					peekedEvents.notifyAll();
-				}
-			}
-		}
   }
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId,
@@ -1285,35 +1286,32 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     throw new UnsupportedOperationException();
   }
 
-	public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
-		List batch = new ArrayList();
-		if (this.resetLastPeeked) {
-			batch.addAll(peekedEvents);
-			this.resetLastPeeked = false;
-			final boolean isDebugEnabled = logger.isDebugEnabled();
-
-			if (isDebugEnabled) {
-				StringBuffer buffer = new StringBuffer();
-				for (GatewaySenderEventImpl ge : peekedEvents) {
-					buffer.append("event :");
-					buffer.append(ge);
-				}
-				logger.debug("Adding already peeked events to the batch {}",
-						buffer);
-			}
-		}
-		else {
-			// ideally block
-			try {
-				Thread.sleep(200);
-			} catch (InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-		
-		
-		return batch;
-	}
+  public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
+    List batch = new ArrayList();
+    if (this.resetLastPeeked) {
+      batch.addAll(peekedEvents);
+      this.resetLastPeeked = false;
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+
+      if (isDebugEnabled) {
+        StringBuffer buffer = new StringBuffer();
+        for (GatewaySenderEventImpl ge : peekedEvents) {
+          buffer.append("event :");
+          buffer.append(ge);
+        }
+        logger.debug("Adding already peeked events to the batch {}", buffer);
+      }
+    } else {
+      // ideally block
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    return batch;
+  }
   
   public List peek(int batchSize, int timeToWait) throws InterruptedException,
       CacheException {
@@ -1550,11 +1548,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (int i = 0; i < batchSize; i++) {
       remove();
     }
-	synchronized (peekedEvents) {
-		if (peekedEvents.isEmpty()) {
-			peekedEvents.notifyAll();
-		}
-	}
+    synchronized (peekedEvents) {
+      if (peekedEvents.isEmpty()) {
+        peekedEvents.notifyAll();
+      }
+    }
   }
   
   public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
@@ -1944,21 +1942,21 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   	throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
   }
 
-	public void setBuckets(Set<Integer> buckets2) {
-		this.buckets = buckets2;
-		if (logger.isDebugEnabled()) {
-	          logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2, this.buckets);
-	    }
-		
-	}
+  public void setBuckets(Set<Integer> buckets2) {
+    this.buckets = buckets2;
+    if (logger.isDebugEnabled()) {
+      logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2,
+          this.buckets);
+    }
+
+  }
 
-	public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
-		return peekedEvents;
-	}
+  public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
+    return peekedEvents;
+  }
 
-	public void setSingleHop() {
-		this.singleHop = true;
-	}
+  public void setSingleHop() {
+    this.singleHop = true;
+  }
 
-	
 }