You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pr...@apache.org on 2018/11/16 23:33:00 UTC

[geode] branch feature/wan_single_hop_wip created (now 37a9ee5)

This is an automated email from the ASF dual-hosted git repository.

prhomberg pushed a change to branch feature/wan_single_hop_wip
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 37a9ee5    WAN SingleHop Implementation

This branch includes the following new commits:

     new b18c1c5      SingleHop implementation for WAN
     new aaa5e22  Merge branch 'master' into feature/wan_single_hop_wip
     new 37a9ee5    WAN SingleHop Implementation

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 03/03: WAN SingleHop Implementation

Posted by pr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

prhomberg pushed a commit to branch feature/wan_single_hop_wip
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 37a9ee5f528aefe5ae63743b80c7cb399bb7b768
Author: Suranjan Kumar <sk...@pivotal.io>
AuthorDate: Mon Nov 2 10:56:17 2015 +0530

      WAN SingleHop Implementation
    
      Follow up checkins for fixes of issues related SerialGatewaySender,ClassCastException and NPE
---
 .../gemfire/internal/cache/BucketAdvisor.java      |   2 +-
 .../internal/cache/partitioned/RegionAdvisor.java  | 107 ++++++++------
 .../sockets/command/GatewayReceiverCommand.java    |  46 +++---
 .../wan/AbstractGatewaySenderEventProcessor.java   |  78 +++++-----
 .../ParallelGatewaySenderEventProcessor.java       |  42 +++---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 160 ++++++++++-----------
 6 files changed, 227 insertions(+), 208 deletions(-)

diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 9fff9da..64d340f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1794,7 +1794,7 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
     if (!this.pRegion.isShadowPR()) {
       GemFireCacheImpl c = getProxyBucketRegion().getCache();
       List servers = null;
-      servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
+      //servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
       servers = c.getCacheServersAndGatewayReceiver();
 
       HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>();
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
index 0f9ceb6..8ba3d1f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.AbstractSet;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.InterestPolicy;
 import com.gemstone.gemfire.cache.LowMemoryException;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.ProfileListener;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -306,76 +308,91 @@ public class RegionAdvisor extends CacheDistributionAdvisor
   
   /**
    * get the bucket to primary serverlocation map.
+   * 
    * @return
    */
-  
+
   public Map<Integer, BucketServerLocation66> getAllPrimaryClientBucketProfiles() {
-	Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
+    Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
     for (Integer bucketId : this.clientBucketProfilesMap.keySet()) {
       for (BucketProfile profile : this.clientBucketProfilesMap.get(bucketId)) {
-	      ServerBucketProfile cProfile = (ServerBucketProfile)profile;
-	      Set<BucketServerLocation66> bucketServerLocations = cProfile.getBucketServerLocations();
-	      for(BucketServerLocation66 loc: bucketServerLocations) {
-	    	  if(loc.isPrimary()) {
-	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
-	    	  }
-	      }
+        ServerBucketProfile cProfile = (ServerBucketProfile) profile;
+        Set<BucketServerLocation66> bucketServerLocations = cProfile
+            .getBucketServerLocations();
+        for (BucketServerLocation66 loc : bucketServerLocations) {
+          if (loc.isPrimary()) {
+            bucketToPrimaryServerLocation.put(bucketId, loc);
+          }
+        }
       }
     }
-    
+
     if (getPartitionedRegion().isDataStore()) {
-      for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
+      for (Integer bucketId : getPartitionedRegion().getDataStore()
+          .getAllLocalBucketIds()) {
         BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
 
         if (logger.isDebugEnabled()) {
           logger.debug("The local profile is : {}", profile);
         }
-        
+
         if (profile != null) {
           if (profile instanceof ServerBucketProfile) {
-            ServerBucketProfile cProfile = (ServerBucketProfile)profile;
+            ServerBucketProfile cProfile = (ServerBucketProfile) profile;
             Set<BucketServerLocation66> bucketServerLocations = cProfile
                 .getBucketServerLocations();
-            
-            for(BucketServerLocation66 loc: bucketServerLocations) {
-            	if(loc.isPrimary()) {
-  	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
-  	    	  }
+
+            for (BucketServerLocation66 loc : bucketServerLocations) {
+              if (loc.isPrimary()) {
+                bucketToPrimaryServerLocation.put(bucketId, loc);
+              }
             }
           }
         }
       }
     }
     if (logger.isDebugEnabled()) {
-        logger.debug("The bucket to primary serverlocation is : {}", bucketToPrimaryServerLocation);
-      }
+      logger.debug("The bucket to primary serverlocation is : {}",
+          bucketToPrimaryServerLocation);
+    }
     return bucketToPrimaryServerLocation;
   }
-  
-	public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
-		Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
-
-		for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
-				.values()) {
-			if (recieverToPrimaryBucketMap.containsKey(location)) {
-				Set<Integer> l = recieverToPrimaryBucketMap.get(location);
-				l.add(location.getBucketId());
-			} else {
-				Set<Integer> bucketList = new HashSet<Integer>();
-				bucketList.add(location.getBucketId());
-				recieverToPrimaryBucketMap.put(location, bucketList);
-			}
-		}
-		
-		Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
-		
-		for(BucketServerLocation66 bl: recieverToPrimaryBucketMap.keySet()) {
-			recieverToPrimaryBucketMapCopy.put(
-					new ServerLocation(bl.getHostName(), bl.getPort()),
-					recieverToPrimaryBucketMap.get(bl));
-		}
-		return recieverToPrimaryBucketMapCopy;
-	}
+
+  public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
+    Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
+
+    for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
+        .values()) {
+      if (recieverToPrimaryBucketMap.containsKey(location)) {
+        Set<Integer> l = recieverToPrimaryBucketMap.get(location);
+        l.add(location.getBucketId());
+      } else {
+        Set<Integer> bucketList = new HashSet<Integer>();
+        bucketList.add(location.getBucketId());
+        recieverToPrimaryBucketMap.put(location, bucketList);
+      }
+    }
+
+    Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
+    
+   // GemFireCacheImpl c = getPartitionedRegion().getCache();
+   // Set<GatewayReceiver> s = c.getGatewayReceivers();
+    
+    for (BucketServerLocation66 bl : recieverToPrimaryBucketMap.keySet()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("The servergroup is : {}",  bl.getServerGroups());
+      }
+      List<String> allgroups = Arrays.asList(bl.getServerGroups());
+      boolean contains = allgroups.contains(GatewayReceiver.RECEIVER_GROUP);
+      
+      if (contains) {
+        recieverToPrimaryBucketMapCopy.put(
+            new ServerLocation(bl.getHostName(), bl.getPort()),
+            recieverToPrimaryBucketMap.get(bl));
+      }
+    }
+    return recieverToPrimaryBucketMapCopy;
+  }
   
   public ConcurrentHashMap<Integer, Set<ServerBucketProfile>> getAllClientBucketProfilesTest() {
     ConcurrentHashMap<Integer, Set<ServerBucketProfile>> map = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 9334d4f..f39b9ba 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -754,29 +754,31 @@ public class GatewayReceiverCommand extends BaseCommand {
     replyMsg.setTransactionId(msg.getTransactionId());
     
     boolean nwhop = false;
-    if(region instanceof PartitionedRegion ) {
-    	PartitionedRegion pr = (PartitionedRegion)region;
-    	nwhop = pr.isNetworkHop().byteValue() != (byte)0;
+    if (region instanceof PartitionedRegion) {
+      PartitionedRegion pr = (PartitionedRegion) region;
+      nwhop = pr.isNetworkHop().byteValue() != (byte) 0;
     }
-    if (nwhop) {
-    	PartitionedRegion pr = (PartitionedRegion)region;
-    	Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
-    	        .getRegionAdvisor().getAllPrimaryBucketLocations();
-    	if (logger.isDebugEnabled()) {
-    	    logger.debug("Replying with the primary locations as nwhop occurred {}", recieverToPrimaryBucketMap);
-    	}
-        replyMsg.setNumberOfParts(3);
-        replyMsg.addIntPart(batchId);
-        replyMsg.addIntPart(numberOfEvents);
-        replyMsg.addObjPart(recieverToPrimaryBucketMap);
-    }
-    else{
-    	if (logger.isDebugEnabled()) {
-    	    logger.debug("Not Replying with the primary locations as no nwhop occurred");
-    	}
-    	replyMsg.setNumberOfParts(2);
-        replyMsg.addIntPart(batchId);
-        replyMsg.addIntPart(numberOfEvents);
+    if (nwhop && (servConn.getClientVersion() != null)
+        && (servConn.getClientVersion().compareTo(Version.GFE_90) >= 0)) {
+      PartitionedRegion pr = (PartitionedRegion) region;
+      Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
+          .getRegionAdvisor().getAllPrimaryBucketLocations();
+      if (logger.isDebugEnabled()) {
+        logger.debug("Replying with the primary locations as nwhop occurred {}",
+            recieverToPrimaryBucketMap);
+      }
+      replyMsg.setNumberOfParts(3);
+      replyMsg.addIntPart(batchId);
+      replyMsg.addIntPart(numberOfEvents);
+      replyMsg.addObjPart(recieverToPrimaryBucketMap);
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Not Replying with the primary locations as no nwhop occurred");
+      }
+      replyMsg.setNumberOfParts(2);
+      replyMsg.addIntPart(batchId);
+      replyMsg.addIntPart(numberOfEvents);
     }
     
     replyMsg.setTransactionId(msg.getTransactionId());
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e6e9207..1886811 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -491,12 +491,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
               }
             }*/
             }
-            if(this.resumeWhenPeekedEventsEmpty) {
-            	if(this.queue instanceof ParallelGatewaySenderQueue) {
-            		events = ((ParallelGatewaySenderQueue)this.queue).peekAlreadyPeekedEvents();
-            	}
-            }else{
-            	events = this.queue.peek(batchSize, batchTimeInterval);
+            if (this.resumeWhenPeekedEventsEmpty) {
+              if (this.queue instanceof ParallelGatewaySenderQueue) {
+                events = ((ParallelGatewaySenderQueue) this.queue)
+                    .peekAlreadyPeekedEvents();
+              }
+            } else {
+              events = this.queue.peek(batchSize, batchTimeInterval);
             }
           } catch (InterruptedException e) {
             interrupted = true;
@@ -1377,37 +1378,36 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
   public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
   }
 
-	public boolean isPeekedEventEmpty() {
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK Getting called for this processor  " + this);
-		}
-		BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
-				.getPeekedEvents();
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK the peeked evets are for this processor  "
-					+ peekedEvents);
-			
-		}
-		
-//		synchronized (peekedEvents) {
-//			while(!peekedEvents.isEmpty()) {
-//				try {
-//					peekedEvents.wait();
-//				} catch (InterruptedException e) {
-//					e.printStackTrace();
-//				}
-//			}
-//		}
-		
-		if (logger.isDebugEnabled()) {
-			logger.debug("SKSKSK WAIT COMPLETE  "
-					+ peekedEvents);
-			
-		}
-		return peekedEvents.isEmpty();
-	}
-
-	public void markResumeWhenPeekedEventEmpty(boolean flag) {
-		this.resumeWhenPeekedEventsEmpty = flag;
-	}   
+  public boolean isPeekedEventEmpty() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("SKSKSK Getting called for this processor  " + this);
+    }
+    BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
+        .getPeekedEvents();
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "SKSKSK the peeked evets are for this processor  " + peekedEvents);
+
+    }
+
+    // synchronized (peekedEvents) {
+    // while(!peekedEvents.isEmpty()) {
+    // try {
+    // peekedEvents.wait();
+    // } catch (InterruptedException e) {
+    // e.printStackTrace();
+    // }
+    // }
+    // }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("SKSKSK WAIT COMPLETE  " + peekedEvents);
+
+    }
+    return peekedEvents.isEmpty();
+  }
+
+  public void markResumeWhenPeekedEventEmpty(boolean flag) {
+    this.resumeWhenPeekedEventsEmpty = flag;
+  }
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 9bc3ae8..2181114 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
@@ -51,6 +52,7 @@ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
 import com.gemstone.gemfire.internal.size.SingleObjectSizer;
 
 import java.io.IOException;
@@ -82,23 +84,27 @@ public class ParallelGatewaySenderEventProcessor extends
     initializeMessageQueue(sender.getId());
     setDaemon(true);
   }
-  
-  public synchronized void  setBuckets(Set<Integer> s) {
-	  if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderEventProcessor before setting: {} , {}", s, this.buckets);
-	  }
-	  this.buckets.clear();
-	  this.buckets.addAll(s);
-	  if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderEventProcessor after settingn: {} , {}", s, this.buckets);
-	  }
-	  
-	  ((ParallelGatewaySenderQueue)this.queue).setBuckets(this.buckets);
-	  ((ParallelGatewaySenderQueue)this.queue).setSingleHop();
+
+  public synchronized void setBuckets(Set<Integer> s) {
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "ParallelGatewaySenderEventProcessor before setting: {} , {}", s,
+          this.buckets);
+    }
+    this.buckets.clear();
+    this.buckets.addAll(s);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "ParallelGatewaySenderEventProcessor after settingn: {} , {}", s,
+          this.buckets);
+    }
+
+    ((ParallelGatewaySenderQueue) this.queue).setBuckets(this.buckets);
+    ((ParallelGatewaySenderQueue) this.queue).setSingleHop();
   }
   
   public Set<Integer> getBuckets() {
-	  return this.buckets;
+    return this.buckets;
   }
   /**
    * use in concurrent scenario where queue is to be shared among all the processors.
@@ -109,10 +115,8 @@ public class ParallelGatewaySenderEventProcessor extends
         "Event Processor for GatewaySender_" + sender.getId()+"_"+ id, sender);
     this.index = id;
     this.nDispatcher = nDispatcher;
-    //this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
-    //SURANJAN for the time being assume 113, it should be totalnumbuckets in the region
     this.buckets = new HashSet<Integer>();
-    for(int i=index;i< 113; i = i+nDispatcher) {
+    for (int i = index; i < PartitionAttributesFactory.GLOBAL_MAX_BUCKETS_DEFAULT; i = i + nDispatcher) {
       this.buckets.add(i);
     }
     initializeMessageQueue(sender.getId());
@@ -269,8 +273,6 @@ public class ParallelGatewaySenderEventProcessor extends
 
   @Override
   public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
-	  this.sender.setPrimaryLocations(locations);
+    this.sender.setPrimaryLocations(locations);
   }
-  
-
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 170a965..437445b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1010,25 +1010,27 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         BucketRegion bucket = bucketEntry.getValue();
         if (bucket.getBucketAdvisor().isPrimary()) {
           int bId = bucket.getId();
-          if(this.buckets.isEmpty() && !singleHop){
-        	if (bId % this.nDispatcher == this.index) {
-        	  thisProcessorBuckets.add(bId);
-        	}
-          }else {
-        	  if(this.buckets.contains(bId)) {
-        		thisProcessorBuckets.add(bId);
-              }
+          if (this.buckets.isEmpty() && !singleHop) {
+            if (bId % this.nDispatcher == this.index) {
+              thisProcessorBuckets.add(bId);
+            }
+          } else {
+            if (this.buckets.contains(bId)) {
+              thisProcessorBuckets.add(bId);
+            }
           }
         }
       }
-      
+
       if (logger.isDebugEnabled()) {
-          logger.debug("getRandomPrimaryBucket: allocated {} for this processor", this.buckets);
-        }      
-      
+        logger.debug("getRandomPrimaryBucket: allocated {} for this processor",
+            this.buckets);
+      }
+
       if (logger.isDebugEnabled()) {
-        logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), thisProcessorBuckets.size());
-      }           
+        logger.debug("getRandomPrimaryBucket: total {} for this processor: {}",
+            allBuckets.size(), thisProcessorBuckets.size());
+      }          
       
       int nTry =  thisProcessorBuckets.size();
       
@@ -1057,18 +1059,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return -1;
   }
   
-	public Set<Integer> getAllBucket() {
-		PartitionedRegion prQ = getRandomShadowPR();
-		
-		if (prQ != null) {
-			Set<Integer> s = new HashSet<Integer>();
-			for(int i=0;i<prQ.getTotalNumberOfBuckets(); i++) {
-				s.add(i);
-			}
-			return s;
-		}
-		return Collections.EMPTY_SET;
-	}
+  public Set<Integer> getAllBucket() {
+    PartitionedRegion prQ = getRandomShadowPR();
+
+    if (prQ != null) {
+      Set<Integer> s = new HashSet<Integer>();
+      for (int i = 0; i < prQ.getTotalNumberOfBuckets(); i++) {
+        s.add(i);
+      }
+      return s;
+    }
+    return Collections.EMPTY_SET;
+  }
   
   @Override
   public List take(int batchSize) throws CacheException, InterruptedException {
@@ -1127,14 +1129,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       } finally {
         event.release();
       }
+    } else {
+      synchronized (peekedEvents) {
+        if (peekedEvents.isEmpty()) {
+          peekedEvents.notifyAll();
+        }
+      }
     }
-    else {
-			synchronized (peekedEvents) {
-				if (peekedEvents.isEmpty()) {
-					peekedEvents.notifyAll();
-				}
-			}
-		}
   }
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId,
@@ -1285,35 +1286,32 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     throw new UnsupportedOperationException();
   }
 
-	public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
-		List batch = new ArrayList();
-		if (this.resetLastPeeked) {
-			batch.addAll(peekedEvents);
-			this.resetLastPeeked = false;
-			final boolean isDebugEnabled = logger.isDebugEnabled();
-
-			if (isDebugEnabled) {
-				StringBuffer buffer = new StringBuffer();
-				for (GatewaySenderEventImpl ge : peekedEvents) {
-					buffer.append("event :");
-					buffer.append(ge);
-				}
-				logger.debug("Adding already peeked events to the batch {}",
-						buffer);
-			}
-		}
-		else {
-			// ideally block
-			try {
-				Thread.sleep(200);
-			} catch (InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-		
-		
-		return batch;
-	}
+  public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
+    List batch = new ArrayList();
+    if (this.resetLastPeeked) {
+      batch.addAll(peekedEvents);
+      this.resetLastPeeked = false;
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+
+      if (isDebugEnabled) {
+        StringBuffer buffer = new StringBuffer();
+        for (GatewaySenderEventImpl ge : peekedEvents) {
+          buffer.append("event :");
+          buffer.append(ge);
+        }
+        logger.debug("Adding already peeked events to the batch {}", buffer);
+      }
+    } else {
+      // ideally block
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    return batch;
+  }
   
   public List peek(int batchSize, int timeToWait) throws InterruptedException,
       CacheException {
@@ -1550,11 +1548,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (int i = 0; i < batchSize; i++) {
       remove();
     }
-	synchronized (peekedEvents) {
-		if (peekedEvents.isEmpty()) {
-			peekedEvents.notifyAll();
-		}
-	}
+    synchronized (peekedEvents) {
+      if (peekedEvents.isEmpty()) {
+        peekedEvents.notifyAll();
+      }
+    }
   }
   
   public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
@@ -1944,21 +1942,21 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   	throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
   }
 
-	public void setBuckets(Set<Integer> buckets2) {
-		this.buckets = buckets2;
-		if (logger.isDebugEnabled()) {
-	          logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2, this.buckets);
-	    }
-		
-	}
+  public void setBuckets(Set<Integer> buckets2) {
+    this.buckets = buckets2;
+    if (logger.isDebugEnabled()) {
+      logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2,
+          this.buckets);
+    }
+
+  }
 
-	public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
-		return peekedEvents;
-	}
+  public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
+    return peekedEvents;
+  }
 
-	public void setSingleHop() {
-		this.singleHop = true;
-	}
+  public void setSingleHop() {
+    this.singleHop = true;
+  }
 
-	
 }


[geode] 02/03: Merge branch 'master' into feature/wan_single_hop_wip

Posted by pr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

prhomberg pushed a commit to branch feature/wan_single_hop_wip
in repository https://gitbox.apache.org/repos/asf/geode.git

commit aaa5e22eb80cafbbb85f43661ac6638856c7fee2
Merge: b18c1c5 6902465
Author: Suranjan Kumar <sk...@pivotal.io>
AuthorDate: Fri Oct 30 19:28:46 2015 +0530

    Merge branch 'master' into feature/wan_single_hop_wip



[geode] 01/03: SingleHop implementation for WAN

Posted by pr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

prhomberg pushed a commit to branch feature/wan_single_hop_wip
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b18c1c59f2b8775df543b5066fa0297bec11d938
Author: Suranjan Kumar <sk...@pivotal.io>
AuthorDate: Fri Oct 30 18:07:23 2015 +0530

        SingleHop implementation for WAN
    
        Keep track of network hop for wan related operations on remote site
        In case of network hop, send the map of GatewayReceiver to primaryBucketIds to sending site
        On sending site, flush all the already peeked events and reditribute the bucket set to processors
        Create extra processors if reuired, bounded by user set totalDispatchers
---
 .../gemfire/cache/client/internal/PoolImpl.java    |   3 +
 .../internal/HDFSParallelGatewaySenderQueue.java   |  16 +--
 .../distributed/internal/DistributionConfig.java   |   2 +-
 .../internal/cache/AbstractBucketRegionQueue.java  |   1 +
 .../gemfire/internal/cache/BucketAdvisor.java      |   3 +-
 .../internal/cache/partitioned/RegionAdvisor.java  |  74 ++++++++++++
 .../sockets/command/GatewayReceiverCommand.java    |  47 +++++++-
 .../internal/cache/wan/AbstractGatewaySender.java  |  11 ++
 .../wan/AbstractGatewaySenderEventProcessor.java   |  79 +++++++++++--
 ...currentParallelGatewaySenderEventProcessor.java |  39 +++----
 .../ConcurrentParallelGatewaySenderQueue.java      |  53 ++++-----
 .../ParallelGatewaySenderEventProcessor.java       |  37 +++++-
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 124 +++++++++++++++++++--
 13 files changed, 408 insertions(+), 81 deletions(-)

diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
index 8aa5ce2..9c7003f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
@@ -844,6 +844,9 @@ public class PoolImpl implements InternalPool {
     return manager.borrowConnection(45000L);
   }
   
+  public Connection acquireConnectionForWAN(ServerLocation loc) {
+	return manager.borrowConnection(loc,45000L,false);
+  }
   /**
    * Hook to return connections that were acquired using 
    * acquireConnection.
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
index 87e8ce4..8965185 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
@@ -18,11 +18,11 @@
 package com.gemstone.gemfire.cache.hdfs.internal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.Region;
@@ -53,7 +53,7 @@ public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
   public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
       Set<Region> userPRs, int idx, int nDispatcher) {
      
-    super(sender, userPRs, idx, nDispatcher);
+    super(sender, userPRs, idx, nDispatcher,Collections.<Integer> emptySet());
     //only first dispatcher Hemant?
     if (sender.getBucketSorted() && this.index == 0) {
       rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
@@ -124,7 +124,7 @@ public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
     if (this.resetLastPeeked) {
       int previousBucketId = -1;
       boolean stillPrimary = true; 
-      Iterator<GatewaySenderEventImpl>  iter = peekedEvents.iterator();
+      Iterator<GatewaySenderEventImpl>  iter = getPeekedEvents().iterator();
       // we need to remove the events of the bucket that are no more primary on 
       // this node as they cannot be persisted from this node. 
       while(iter.hasNext()) {
@@ -213,7 +213,7 @@ public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
       if (list != null && list.size() != 0 ) {
         for (Object object : list) {
           batch.add(object);
-          peekedEvents.add((HDFSGatewayEventImpl)object);
+          getPeekedEvents().add((HDFSGatewayEventImpl)object);
         }
       }
     }
@@ -252,8 +252,8 @@ public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
     int destroyed = 0;
     HDFSGatewayEventImpl event = null;
     
-    if (this.peekedEvents.size() > 0)
-      event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+    if (this.getPeekedEvents().size() > 0)
+      event = (HDFSGatewayEventImpl)this.getPeekedEvents().remove();
     
     while (event != null && destroyed < batchSize) {
       Region currentRegion = event.getRegion();
@@ -269,12 +269,12 @@ public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
         destroyedSeqNum.add(event.getShadowKey());
         destroyed++;
 
-        if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
+        if (this.getPeekedEvents().size() == 0 || (destroyed) >= batchSize) {
           event = null; 
           break;
         }
 
-        event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+        event = (HDFSGatewayEventImpl)this.getPeekedEvents().remove();
 
         bucketId = event.getBucketId();
 
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
index 1504421..f21bbdf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionConfig.java
@@ -472,7 +472,7 @@ public interface DistributionConfig extends Config, LogConfig {
    * The default log level.
    * <p> Actual value of this constant is {@link InternalLogWriter#CONFIG_LEVEL}.
    */
-  public static final int DEFAULT_LOG_LEVEL = InternalLogWriter.CONFIG_LEVEL;
+  public static final int DEFAULT_LOG_LEVEL = InternalLogWriter.FINE_LEVEL;
   /**
    * The minimum log level.
    * <p> Actual value of this constant is {@link InternalLogWriter#ALL_LEVEL}.
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index 2c8f493..487e6be 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -527,6 +527,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
   @Override
   public void afterAcquiringPrimaryState() {
     super.afterAcquiringPrimaryState();
+    // add the serverlocation of primary receiver
     notifyEventProcessor();
   }
   
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 1eaabf1..9fff9da 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1794,7 +1794,8 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
     if (!this.pRegion.isShadowPR()) {
       GemFireCacheImpl c = getProxyBucketRegion().getCache();
       List servers = null;
-      servers = c.getCacheServers();
+      servers = c.getCacheServers();// SURANJAN change it to servers and receivrs later on
+      servers = c.getCacheServersAndGatewayReceiver();
 
       HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>();
       for (Object object : servers) {
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
index 03df3fb..0f9ceb6 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/RegionAdvisor.java
@@ -47,6 +47,7 @@ import com.gemstone.gemfire.cache.LowMemoryException;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.ProfileListener;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
@@ -303,6 +304,79 @@ public class RegionAdvisor extends CacheDistributionAdvisor
     return bucketToServerLocations;
   }
   
+  /**
+   * get the bucket to primary serverlocation map.
+   * @return
+   */
+  
+  public Map<Integer, BucketServerLocation66> getAllPrimaryClientBucketProfiles() {
+	Map<Integer, BucketServerLocation66> bucketToPrimaryServerLocation = new HashMap<Integer, BucketServerLocation66>();
+    for (Integer bucketId : this.clientBucketProfilesMap.keySet()) {
+      for (BucketProfile profile : this.clientBucketProfilesMap.get(bucketId)) {
+	      ServerBucketProfile cProfile = (ServerBucketProfile)profile;
+	      Set<BucketServerLocation66> bucketServerLocations = cProfile.getBucketServerLocations();
+	      for(BucketServerLocation66 loc: bucketServerLocations) {
+	    	  if(loc.isPrimary()) {
+	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
+	    	  }
+	      }
+      }
+    }
+    
+    if (getPartitionedRegion().isDataStore()) {
+      for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
+        BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("The local profile is : {}", profile);
+        }
+        
+        if (profile != null) {
+          if (profile instanceof ServerBucketProfile) {
+            ServerBucketProfile cProfile = (ServerBucketProfile)profile;
+            Set<BucketServerLocation66> bucketServerLocations = cProfile
+                .getBucketServerLocations();
+            
+            for(BucketServerLocation66 loc: bucketServerLocations) {
+            	if(loc.isPrimary()) {
+  	    		  bucketToPrimaryServerLocation.put(bucketId, loc);
+  	    	  }
+            }
+          }
+        }
+      }
+    }
+    if (logger.isDebugEnabled()) {
+        logger.debug("The bucket to primary serverlocation is : {}", bucketToPrimaryServerLocation);
+      }
+    return bucketToPrimaryServerLocation;
+  }
+  
+	public Map<ServerLocation, Set<Integer>> getAllPrimaryBucketLocations() {
+		Map<BucketServerLocation66, Set<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, Set<Integer>>();
+
+		for (BucketServerLocation66 location : getAllPrimaryClientBucketProfiles()
+				.values()) {
+			if (recieverToPrimaryBucketMap.containsKey(location)) {
+				Set<Integer> l = recieverToPrimaryBucketMap.get(location);
+				l.add(location.getBucketId());
+			} else {
+				Set<Integer> bucketList = new HashSet<Integer>();
+				bucketList.add(location.getBucketId());
+				recieverToPrimaryBucketMap.put(location, bucketList);
+			}
+		}
+		
+		Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMapCopy = new ConcurrentHashMap<ServerLocation, Set<Integer>>();
+		
+		for(BucketServerLocation66 bl: recieverToPrimaryBucketMap.keySet()) {
+			recieverToPrimaryBucketMapCopy.put(
+					new ServerLocation(bl.getHostName(), bl.getPort()),
+					recieverToPrimaryBucketMap.get(bl));
+		}
+		return recieverToPrimaryBucketMapCopy;
+	}
+  
   public ConcurrentHashMap<Integer, Set<ServerBucketProfile>> getAllClientBucketProfilesTest() {
     ConcurrentHashMap<Integer, Set<ServerBucketProfile>> map = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();
     Map<Integer, List<BucketServerLocation66>> testMap = this.getAllClientBucketProfiles();
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 6c8634d..9334d4f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -22,6 +22,8 @@ package com.gemstone.gemfire.internal.cache.tier.sockets.command;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -32,12 +34,15 @@ import com.gemstone.gemfire.cache.wan.GatewayReceiver;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionStats;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.i18n.LogWriterI18n;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
 import com.gemstone.gemfire.internal.cache.tier.Command;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
@@ -77,6 +82,8 @@ public class GatewayReceiverCommand extends BaseCommand {
     CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
     GatewayReceiverStats stats = (GatewayReceiverStats)servConn.getCacheServerStats();
     EventID eventId = null;
+    //For single hop assuming that there is only one 
+    // region present
     LocalRegion region = null;
     List<BatchException70> exceptions = new ArrayList<BatchException70>();
     Throwable fatalException = null;
@@ -111,7 +118,7 @@ public class GatewayReceiverCommand extends BaseCommand {
       }
       else {
         logger.warn(LocalizedMessage.create(LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_THAT_HAS_ALREADY_BEEN_OR_IS_BEING_PROCESSED__THIS_PROCESS_BATCH_REQUEST_IS_BEING_IGNORED, batchId));
-        writeReply(msg, servConn, batchId, numberOfEvents);
+        writeReply(msg, servConn, batchId, numberOfEvents, region);
         return;
       }
       stats.incDuplicateBatchesReceived();
@@ -322,6 +329,10 @@ public class GatewayReceiverCommand extends BaseCommand {
               }
               // Attempt to create the entry
               boolean result = false;
+              
+              if (logger.isDebugEnabled()) {
+                  logger.debug(" SKSKSK Processing event with proxy {}",servConn.getProxyID());
+              }
               result = region.basicBridgeCreate(key, value, isObject, callbackArg,
                       servConn.getProxyID(), false, clientEvent, false); 
               // If the create fails (presumably because it already exists),
@@ -709,7 +720,7 @@ public class GatewayReceiverCommand extends BaseCommand {
       // batch)
       servConn.incrementLatestBatchIdReplied(batchId);
       
-      writeReply(msg, servConn, batchId, numberOfEvents);
+      writeReply(msg, servConn, batchId, numberOfEvents, region);
       servConn.setAsTrue(RESPONDED);
       stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime()
           - start);
@@ -737,13 +748,37 @@ public class GatewayReceiverCommand extends BaseCommand {
   }
 
   private void writeReply(Message msg, ServerConnection servConn, int batchId,
-      int numberOfEvents) throws IOException {
+      int numberOfEvents, LocalRegion region) throws IOException {
     Message replyMsg = servConn.getResponseMessage();
     replyMsg.setMessageType(MessageType.REPLY);
     replyMsg.setTransactionId(msg.getTransactionId());
-    replyMsg.setNumberOfParts(2);
-    replyMsg.addIntPart(batchId);
-    replyMsg.addIntPart(numberOfEvents);
+    
+    boolean nwhop = false;
+    if(region instanceof PartitionedRegion ) {
+    	PartitionedRegion pr = (PartitionedRegion)region;
+    	nwhop = pr.isNetworkHop().byteValue() != (byte)0;
+    }
+    if (nwhop) {
+    	PartitionedRegion pr = (PartitionedRegion)region;
+    	Map<ServerLocation, Set<Integer>> recieverToPrimaryBucketMap = pr
+    	        .getRegionAdvisor().getAllPrimaryBucketLocations();
+    	if (logger.isDebugEnabled()) {
+    	    logger.debug("Replying with the primary locations as nwhop occurred {}", recieverToPrimaryBucketMap);
+    	}
+        replyMsg.setNumberOfParts(3);
+        replyMsg.addIntPart(batchId);
+        replyMsg.addIntPart(numberOfEvents);
+        replyMsg.addObjPart(recieverToPrimaryBucketMap);
+    }
+    else{
+    	if (logger.isDebugEnabled()) {
+    	    logger.debug("Not Replying with the primary locations as no nwhop occurred");
+    	}
+    	replyMsg.setNumberOfParts(2);
+        replyMsg.addIntPart(batchId);
+        replyMsg.addIntPart(numberOfEvents);
+    }
+    
     replyMsg.setTransactionId(msg.getTransactionId());
     replyMsg.send(servConn);
     servConn.setAsTrue(Command.RESPONDED);
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index f1613a3..a10949c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -57,6 +59,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.ResourceEvent;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 import com.gemstone.gemfire.internal.cache.CachePerfStats;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
@@ -226,6 +229,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   
   final Object lockForConcurrentDispatcher = new Object();
 
+  private volatile Map<BucketServerLocation66, List<Integer>> recieverToPrimaryBucketMap = new ConcurrentHashMap<BucketServerLocation66, List<Integer>>();
+
   protected AbstractGatewaySender() {
   }
 
@@ -1309,4 +1314,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
       this.event.release();
     }
   }
+
+	public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
+		if(this.eventProcessor != null) {
+			this.eventProcessor.setPrimaryLocations(locations);
+		}
+	}
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index b9d877e..e6e9207 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -24,9 +24,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.logging.log4j.Logger;
 
@@ -43,8 +46,10 @@ import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedExc
 import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
 import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DistributedRegion;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
@@ -230,11 +235,19 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
   protected void eventQueueRemove() throws CacheException,
       InterruptedException {
     this.queue.remove();
+//    if(this.resumeWhenPeekedEventsEmpty && peekedEvents.isEmpty()) {
+//    	this.resumeWhenPeekedEventsEmpty = false;
+//    }
   }
 
   protected void eventQueueRemove(int size) throws CacheException {
     this.queue.remove(size);
+    
+//    if(this.resumeWhenPeekedEventsEmpty && peekedEvents.isEmpty()) {
+//    	this.resumeWhenPeekedEventsEmpty = false;
+//    }
   }
+  
 
   protected Object eventQueueTake() throws CacheException, InterruptedException {
     throw new UnsupportedOperationException();
@@ -268,7 +281,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     }
     this.isPaused = true;
   }
-
+  
   //merge44957: WHile merging 44957, need this method hence picked up this method from revision 42024.
   public void waitForDispatcherToPause() {
     if (!this.isPaused) {
@@ -320,6 +333,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
    */
   private final ConcurrentHashMap<Integer, long[]> failureLogInterval =
       new ConcurrentHashMap<Integer, long[]>();
+
+  private boolean resumeWhenPeekedEventsEmpty = false;
       
   /**
    * The maximum size of {@link #failureLogInterval} beyond which it will start
@@ -403,11 +418,11 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
       if (stopped()) {
         break;
       }
-
+      
       try {
         // Check if paused. If so, wait for resumption
         if (this.isPaused) {
-          waitForResumption();
+        	waitForResumption();
         }
 
         // Peek a batch
@@ -476,7 +491,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
               }
             }*/
             }
-            events = this.queue.peek(batchSize, batchTimeInterval);
+            if(this.resumeWhenPeekedEventsEmpty) {
+            	if(this.queue instanceof ParallelGatewaySenderQueue) {
+            		events = ((ParallelGatewaySenderQueue)this.queue).peekAlreadyPeekedEvents();
+            	}
+            }else{
+            	events = this.queue.peek(batchSize, batchTimeInterval);
+            }
           } catch (InterruptedException e) {
             interrupted = true;
             this.sender.getCancelCriterion().checkCancelInProgress(e);
@@ -979,6 +1000,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
       eventQueueRemove(events.size());
     }
     
+    //set the latest primary locations to the sender.
+    
   }
   
   public void handleUnSuccessBatchAck(int bId) {
@@ -1241,7 +1264,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     return numEventsDispatched;
   }
   public void clear(PartitionedRegion pr, int bucketId) {
-    ((ParallelGatewaySenderQueue)this.queue).clear(pr, bucketId);
+	  if(this.queue instanceof ParallelGatewaySenderQueue) {
+		  ((ParallelGatewaySenderQueue)this.queue).clear(pr, bucketId);
+	  }
 }
 
 /*public int size(PartitionedRegion pr, int bucketId)
@@ -1250,7 +1275,10 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
 }*/
 
   public void notifyEventProcessorIfRequired(int bucketId) {
-    ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();
+	  if(this.queue instanceof ParallelGatewaySenderQueue) {
+		  ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();	  
+	  }
+    
   }
 
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
@@ -1344,5 +1372,42 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
       }
       return true;
     }
-  }   
+  }
+
+  public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
+  }
+
+	public boolean isPeekedEventEmpty() {
+		if (logger.isDebugEnabled()) {
+			logger.debug("SKSKSK Getting called for this processor  " + this);
+		}
+		BlockingQueue peekedEvents = ((ParallelGatewaySenderQueue) this.queue)
+				.getPeekedEvents();
+		if (logger.isDebugEnabled()) {
+			logger.debug("SKSKSK the peeked evets are for this processor  "
+					+ peekedEvents);
+			
+		}
+		
+//		synchronized (peekedEvents) {
+//			while(!peekedEvents.isEmpty()) {
+//				try {
+//					peekedEvents.wait();
+//				} catch (InterruptedException e) {
+//					e.printStackTrace();
+//				}
+//			}
+//		}
+		
+		if (logger.isDebugEnabled()) {
+			logger.debug("SKSKSK WAIT COMPLETE  "
+					+ peekedEvents);
+			
+		}
+		return peekedEvents.isEmpty();
+	}
+
+	public void markResumeWhenPeekedEventEmpty(boolean flag) {
+		this.resumeWhenPeekedEventsEmpty = flag;
+	}   
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 970afe3..0cbc741 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -22,6 +22,9 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,10 +39,6 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
-import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
@@ -47,7 +46,6 @@ import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.RegionQueue;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -71,7 +69,8 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
 
   protected static final Logger logger = LogService.getLogger();
   
-  protected ParallelGatewaySenderEventProcessor processors[];
+  protected List<ParallelGatewaySenderEventProcessor> processors;
+  
   //private final List<ConcurrentParallelGatewaySenderQueue> concurrentParallelQueues;
   private GemFireException ex = null;
   final int nDispatcher;
@@ -116,13 +115,13 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
   }
   
   protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
-    processors = new ParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
+    processors = new CopyOnWriteArrayList<ParallelGatewaySenderEventProcessor>();
     if (logger.isDebugEnabled()) {
       logger.debug("Creating AsyncEventProcessor");
     }
     for (int i = 0; i < sender.getDispatcherThreads(); i++) {
-      processors[i] = new ParallelGatewaySenderEventProcessor(sender,
-          targetRs, i, sender.getDispatcherThreads());
+      this.processors.add(new ParallelGatewaySenderEventProcessor(sender,
+          targetRs, i, sender.getDispatcherThreads()));
     }
   }
 
@@ -154,7 +153,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
     	return;
     }
     int pId = bucketId % this.nDispatcher;
-    this.processors[pId].enqueueEvent(operation, event, substituteValue);
+    this.processors.get(pId).enqueueEvent(operation, event, substituteValue);
     
    /* if (getSender().beforeEnque(gatewayQueueEvent)) {
       long start = getSender().getStatistics().startTime();
@@ -178,11 +177,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
   public void run() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
-    for(int i = 0; i < this.processors.length; i++){
+    for(ParallelGatewaySenderEventProcessor p:  this.processors){
+    	int i=0;
       if (isDebugEnabled) {
-        logger.debug("Starting the ParallelProcessors {}", i);
+        logger.debug("Starting the ParallelProcessors {}", i++);
       }
-      this.processors[i].start();
+      p.start();
     }
     try {
       waitForRunningStatus();
@@ -258,8 +258,8 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
       stopperCallables.add(new SenderStopperCallable(parallelProcessor));
     }
     
-    ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory);
-    try {
+    ExecutorService stopperService = Executors.newFixedThreadPool(processors.size(), threadFactory);
+    try {	
       List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables);
       for(Future<Boolean> f: futures) {
         try {
@@ -317,6 +317,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
   public void resumeDispatching() {
     for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
       parallelProcessor.resumeDispatching();
+      if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderEventProcessor: Resumed dispatching: {}", parallelProcessor);
+        }
     }
     super.resumeDispatching();
     if (logger.isDebugEnabled()) {
@@ -335,9 +338,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
    */
   public List<ParallelGatewaySenderEventProcessor> getProcessors() {
     List<ParallelGatewaySenderEventProcessor> l = new LinkedList<ParallelGatewaySenderEventProcessor>();
-    for (int i = 0; i < processors.length; i++) {
-      l.add(processors[i]);
-    }
+    l.addAll(processors);
     return l;
   }
 /*
@@ -360,7 +361,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
  
   @Override
   public GatewaySenderEventDispatcher getDispatcher() {
-    return this.processors[0].getDispatcher();//Suranjan is that fine??
+    return this.processors.get(0).getDispatcher();//Suranjan is that fine??
   }
 
   @Override
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 820c40c..0c3ddbb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -56,10 +56,10 @@ import com.gemstone.gemfire.internal.size.SingleObjectSizer;
  */
 public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
 
-  private final ParallelGatewaySenderEventProcessor processors[];
+  private final List<ParallelGatewaySenderEventProcessor> processors;
   
   public ConcurrentParallelGatewaySenderQueue(
-		  ParallelGatewaySenderEventProcessor pro[]) {
+		  List<ParallelGatewaySenderEventProcessor> pro) {
     this.processors = pro;
   }
   
@@ -79,15 +79,15 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
 
   @Override
   public Region getRegion() {
-	  return this.processors[0].getQueue().getRegion();
+	  return this.processors.get(0).getQueue().getRegion();
   }
   
   public PartitionedRegion getRegion(String fullpath) {
-    return processors[0].getRegion(fullpath);
+    return processors.get(0).getRegion(fullpath);
   }
   
   public Set<PartitionedRegion> getRegions() {
-	return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getRegions();
+	return ((ParallelGatewaySenderQueue)(processors.get(0).getQueue())).getRegions();
   }
 
   @Override
@@ -124,21 +124,21 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   @Override
   public int size() {
 	//is that fine??
-	return this.processors[0].getQueue().size();
+	return this.processors.get(0).getQueue().size();
   }
   
   public int localSize() {
-	return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).localSize();
+	return ((ParallelGatewaySenderQueue)(processors.get(0).getQueue())).localSize();
   }
 
   @Override
   public void addCacheListener(CacheListener listener) {
-	  this.processors[0].getQueue().addCacheListener(listener);    
+	  this.processors.get(0).getQueue().addCacheListener(listener);    
   }
 
   @Override
   public void removeCacheListener() {
-    this.processors[0].removeCacheListener();    
+    this.processors.get(0).removeCacheListener();    
   }
 
   @Override
@@ -152,8 +152,8 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   
   public long estimateMemoryFootprint(SingleObjectSizer sizer) {
 	long size = 0;
-	for(int i=0; i< processors.length; i++)
-	  size += ((ParallelGatewaySenderQueue)this.processors[i].getQueue()).estimateMemoryFootprint(sizer);
+	for(ParallelGatewaySenderEventProcessor p: this.processors)
+	  size += ((ParallelGatewaySenderQueue)p.getQueue()).estimateMemoryFootprint(sizer);
 	return size;
   }
 
@@ -165,20 +165,20 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   }*/
   
   public void removeShadowPR(String prRegionName) {
-  	for(int i =0; i< processors.length; i++){
-   	 processors[i].removeShadowPR(prRegionName);
+	  for(ParallelGatewaySenderEventProcessor p: this.processors){
+		  p.removeShadowPR(prRegionName);
     }
   }
   
   public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
-	for(int i =0; i< processors.length; i++){
-	  processors[i].addShadowPartitionedRegionForUserPR(pr);
+	  for(ParallelGatewaySenderEventProcessor p: this.processors){
+	  p.addShadowPartitionedRegionForUserPR(pr);
 	 }
   }
   
   private ParallelGatewaySenderEventProcessor getPGSProcessor(int bucketId) {
-  	int index = bucketId % this.processors.length;
-  	return processors[index];
+  	int index = bucketId % this.processors.size();
+  	return processors.get(index);
   }
 
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
@@ -197,11 +197,12 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   public void clear(PartitionedRegion pr, int bucketId) {
   	getPGSProcessor(bucketId).clear(pr, bucketId);
   }
-  
-  public void cleanUp() {
-	for(int i=0; i< processors.length; i++)
-	  ((ParallelGatewaySenderQueue)this.processors[i].getQueue()).cleanUp();
-  }
+
+	public void cleanUp() {
+		for (ParallelGatewaySenderEventProcessor p : this.processors) {
+			((ParallelGatewaySenderQueue) p.getQueue()).cleanUp();
+		}
+	}
   
   public void conflateEvent(Conflatable conflatableObject, int bucketId,
       Long tailKey) {
@@ -214,16 +215,16 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   }
   
   public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
-	for(int i =0; i< processors.length; i++){
-  	 processors[i].addShadowPartitionedRegionForUserRR(userRegion);;
+	  for(ParallelGatewaySenderEventProcessor p: this.processors){
+  	 p.addShadowPartitionedRegionForUserRR(userRegion);;
    }
   }
   
   public long getNumEntriesInVMTestOnly() {
-	return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getNumEntriesInVMTestOnly();
+	return ((ParallelGatewaySenderQueue)(processors.get(0).getQueue())).getNumEntriesInVMTestOnly();
   }
 	 
   public long getNumEntriesOverflowOnDiskTestOnly() {
-	return ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getNumEntriesOverflowOnDiskTestOnly();
+	return ((ParallelGatewaySenderQueue)(processors.get(0).getQueue())).getNumEntriesOverflowOnDiskTestOnly();
   }
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index f8780f4..9bc3ae8 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -20,7 +20,9 @@
 package com.gemstone.gemfire.internal.cache.wan.parallel;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -32,6 +34,8 @@ import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
 import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DistributedRegion;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
@@ -66,6 +70,7 @@ public class ParallelGatewaySenderEventProcessor extends
   
   final int index; 
   final int nDispatcher;
+  Set<Integer> buckets;
 
   protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender) {
     super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_"
@@ -73,10 +78,28 @@ public class ParallelGatewaySenderEventProcessor extends
         "Event Processor for GatewaySender_" + sender.getId(), sender);
     this.index = 0;
     this.nDispatcher = 1;
+    buckets = Collections.emptySet();
     initializeMessageQueue(sender.getId());
     setDaemon(true);
   }
   
+  public synchronized void  setBuckets(Set<Integer> s) {
+	  if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderEventProcessor before setting: {} , {}", s, this.buckets);
+	  }
+	  this.buckets.clear();
+	  this.buckets.addAll(s);
+	  if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderEventProcessor after settingn: {} , {}", s, this.buckets);
+	  }
+	  
+	  ((ParallelGatewaySenderQueue)this.queue).setBuckets(this.buckets);
+	  ((ParallelGatewaySenderQueue)this.queue).setSingleHop();
+  }
+  
+  public Set<Integer> getBuckets() {
+	  return this.buckets;
+  }
   /**
    * use in concurrent scenario where queue is to be shared among all the processors.
    */
@@ -87,6 +110,11 @@ public class ParallelGatewaySenderEventProcessor extends
     this.index = id;
     this.nDispatcher = nDispatcher;
     //this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
+    //SURANJAN for the time being assume 113, it should be totalnumbuckets in the region
+    this.buckets = new HashSet<Integer>();
+    for(int i=index;i< 113; i = i+nDispatcher) {
+      this.buckets.add(i);
+    }
     initializeMessageQueue(sender.getId());
     setDaemon(true);
   }
@@ -107,7 +135,7 @@ public class ParallelGatewaySenderEventProcessor extends
     if (sender.getIsHDFSQueue())
       this.queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
     else
-      this.queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+      this.queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher, this.buckets);
     
     if(((ParallelGatewaySenderQueue)queue).localSize() > 0) {
       ((ParallelGatewaySenderQueue)queue).notifyEventProcessorIfRequired();
@@ -238,4 +266,11 @@ public class ParallelGatewaySenderEventProcessor extends
     }
     this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
   }
+
+  @Override
+  public void setPrimaryLocations(Map<ServerLocation, Set<Integer>> locations) {
+	  this.sender.setPrimaryLocations(locations);
+  }
+  
+
 }
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 882a99a..170a965 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -59,11 +59,13 @@ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.BucketRegionQueue;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 import com.gemstone.gemfire.internal.cache.ColocationHelper;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DiskRegionStats;
@@ -142,7 +144,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   //TODO:REF: name change for thread, as it appears in the log
   private static BatchRemovalThread removalThread = null;
 
-  protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>();
+  private BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>();
   
   public final AbstractGatewaySender sender ;
   
@@ -212,7 +214,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   
   final protected int index; 
   final protected int nDispatcher;
-  
+  protected Set <Integer> buckets;
   /**
    * A transient queue to maintain the eventSeqNum of the events that are to be
    * sent to remote site. It is cleared when the queue is cleared.
@@ -220,9 +222,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   //private final BlockingQueue<Long> eventSeqNumQueue;  
   
   public ParallelGatewaySenderQueue(AbstractGatewaySender sender,
-      Set<Region> userRegions, int idx, int nDispatcher) {
-    this.index = idx;
+      Set<Region> userRegions, int idx, int nDispatcher, Set<Integer> buckets) {
+    
+	this.index = idx;
     this.nDispatcher = nDispatcher;
+    this.buckets = buckets;
     this.stats = sender.getStatistics();
     this.sender = (AbstractGatewaySender)sender;
     
@@ -930,6 +934,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   
   // TODO: Suranjan Find optimal way to get Random shadow pr as this will be called in each put and peek.
   protected PartitionedRegion getRandomShadowPR() {
+	   if (logger.isDebugEnabled()) {
+	          logger.debug("KKKK getRandomPrimaryBucket: {}",userRegionNameToshadowPRMap);
+	   }
     PartitionedRegion prQ = null;
     if (this.userRegionNameToshadowPRMap.values().size() > 0) {
       int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
@@ -992,23 +999,34 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   }
   
   private int pickBucketId;
-  
+
+  private boolean singleHop;
+
   protected int getRandomPrimaryBucket(PartitionedRegion prQ) {
     if (prQ != null) {
       Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets();
       List<Integer> thisProcessorBuckets = new ArrayList<Integer>();
-      
       for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
         BucketRegion bucket = bucketEntry.getValue();
         if (bucket.getBucketAdvisor().isPrimary()) {
           int bId = bucket.getId();
-          if (bId % this.nDispatcher == this.index) {
-            thisProcessorBuckets.add(bId);
+          if(this.buckets.isEmpty() && !singleHop){
+        	if (bId % this.nDispatcher == this.index) {
+        	  thisProcessorBuckets.add(bId);
+        	}
+          }else {
+        	  if(this.buckets.contains(bId)) {
+        		thisProcessorBuckets.add(bId);
+              }
           }
         }
       }
       
       if (logger.isDebugEnabled()) {
+          logger.debug("getRandomPrimaryBucket: allocated {} for this processor", this.buckets);
+        }      
+      
+      if (logger.isDebugEnabled()) {
         logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", allBuckets.size(), thisProcessorBuckets.size());
       }           
       
@@ -1039,6 +1057,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return -1;
   }
   
+	public Set<Integer> getAllBucket() {
+		PartitionedRegion prQ = getRandomShadowPR();
+		
+		if (prQ != null) {
+			Set<Integer> s = new HashSet<Integer>();
+			for(int i=0;i<prQ.getTotalNumberOfBuckets(); i++) {
+				s.add(i);
+			}
+			return s;
+		}
+		return Collections.EMPTY_SET;
+	}
+  
   @Override
   public List take(int batchSize) throws CacheException, InterruptedException {
     //merge42180
@@ -1097,6 +1128,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         event.release();
       }
     }
+    else {
+			synchronized (peekedEvents) {
+				if (peekedEvents.isEmpty()) {
+					peekedEvents.notifyAll();
+				}
+			}
+		}
   }
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId,
@@ -1247,6 +1285,36 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     throw new UnsupportedOperationException();
   }
 
+	public List<GatewaySenderEventImpl> peekAlreadyPeekedEvents() {
+		List batch = new ArrayList();
+		if (this.resetLastPeeked) {
+			batch.addAll(peekedEvents);
+			this.resetLastPeeked = false;
+			final boolean isDebugEnabled = logger.isDebugEnabled();
+
+			if (isDebugEnabled) {
+				StringBuffer buffer = new StringBuffer();
+				for (GatewaySenderEventImpl ge : peekedEvents) {
+					buffer.append("event :");
+					buffer.append(ge);
+				}
+				logger.debug("Adding already peeked events to the batch {}",
+						buffer);
+			}
+		}
+		else {
+			// ideally block
+			try {
+				Thread.sleep(200);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+		
+		
+		return batch;
+	}
+  
   public List peek(int batchSize, int timeToWait) throws InterruptedException,
       CacheException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1278,6 +1346,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         logger.debug("Adding already peeked events to the batch {}", buffer);
       }
     }
+    // return batch if we only want peeked events.
     
     int bId = -1;
     while (batch.size() < batchSize) {
@@ -1298,8 +1367,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           }
           batch.add(object);
           peekedEvents.add(object);
-          BucketRegionQueue brq = ((BucketRegionQueue)prQ
-              .getDataStore().getLocalBucketById(bId));
+//          BucketRegionQueue brq = ((BucketRegionQueue)prQ
+//              .getDataStore().getLocalBucketById(bId));
           
           //brq.doLockForPrimary(false);
           
@@ -1429,8 +1498,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         Set<BucketRegion> primaryBuckets = ((PartitionedRegion)prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
         
         for (BucketRegion br : primaryBuckets) {
-          if(br.getId() % this.nDispatcher == this.index)
-            size += br.size();
+          int bId = br.getId();
+          if(this.buckets.isEmpty() && !singleHop){
+        	if (bId % this.nDispatcher == this.index) {
+        	  size += br.size();
+        	}
+          }else {//single hop case
+        	if(this.buckets.contains(bId)) {
+        	  size += br.size();
+            }
+          }
         }         
       }
       if (logger.isDebugEnabled()) {
@@ -1473,6 +1550,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (int i = 0; i < batchSize; i++) {
       remove();
     }
+	synchronized (peekedEvents) {
+		if (peekedEvents.isEmpty()) {
+			peekedEvents.notifyAll();
+		}
+	}
   }
   
   public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
@@ -1861,4 +1943,22 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException{
   	throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
   }
+
+	public void setBuckets(Set<Integer> buckets2) {
+		this.buckets = buckets2;
+		if (logger.isDebugEnabled()) {
+	          logger.debug("ParallelQueue Settign buckets : {} , {}", buckets2, this.buckets);
+	    }
+		
+	}
+
+	public BlockingQueue<GatewaySenderEventImpl> getPeekedEvents() {
+		return peekedEvents;
+	}
+
+	public void setSingleHop() {
+		this.singleHop = true;
+	}
+
+	
 }