You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2017/05/15 19:03:41 UTC

[2/2] geode git commit: GEODE-2900: push shadow key back into the front of the eventSeqNumber "Queue"

GEODE-2900:  push shadow key back into the front of the eventSeqNumber "Queue"


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/161a9a08
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/161a9a08
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/161a9a08

Branch: refs/heads/develop
Commit: 161a9a085a6b354e4e64b275346a680b7addfa38
Parents: 9a1aedd
Author: Jason Huynh <hu...@gmail.com>
Authored: Fri May 12 16:50:02 2017 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon May 15 12:03:24 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegionQueue.java | 38 +++++++++++--------
 .../internal/cache/StateFlushOperation.java     | 40 ++++++++++----------
 .../parallel/ParallelGatewaySenderQueue.java    |  6 ++-
 .../geode/internal/cache/GIIDeltaDUnitTest.java |  3 +-
 4 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..e9a74e7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -27,8 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,10 +73,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   private final Map indexes;
 
   /**
-   * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote site.
-   * It is cleared when the queue is cleared.
+   * A transient deque, but should be treated like as a fifo queue to maintain the eventSeqNum of
+   * the events that are to be sent to remote site. It is cleared when the queue is cleared.
    */
-  private final BlockingQueue<Object> eventSeqNumQueue = new LinkedBlockingQueue<Object>();
+  private final BlockingDeque<Object> eventSeqNumDeque = new LinkedBlockingDeque<Object>();
 
   // private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new
   // LinkedBlockingQueue<EventID>();
@@ -139,7 +141,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
             }
           });
           for (EventID eventID : keys) {
-            eventSeqNumQueue.add(eventID);
+            eventSeqNumDeque.addLast(eventID);
           }
         } else {
           TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
@@ -150,7 +152,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
           // fix for #49679 NoSuchElementException thrown from BucketRegionQueue.initialize
           if (!sortedKeys.isEmpty()) {
             for (Long key : sortedKeys) {
-              eventSeqNumQueue.add(key);
+              eventSeqNumDeque.addLast(key);
             }
             lastKeyRecovered = sortedKeys.last();
             if (this.getEventSeqNum() != null) {
@@ -162,7 +164,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "For bucket {} ,total keys recovered are : {} last key recovered is : {} and the seqNo is ",
-              getId(), eventSeqNumQueue.size(), lastKeyRecovered, getEventSeqNum());
+              getId(), eventSeqNumDeque.size(), lastKeyRecovered, getEventSeqNum());
         }
       }
       this.initialized = true;
@@ -211,7 +213,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   @Override
   public void beforeAcquiringPrimaryState() {
     int batchSize = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
-    Iterator<Object> itr = eventSeqNumQueue.iterator();
+    Iterator<Object> itr = eventSeqNumDeque.iterator();
     markEventsAsDuplicate(batchSize, itr);
   }
 
@@ -224,7 +226,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       }
     });
     this.indexes.clear();
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
   }
 
   @Override
@@ -236,7 +238,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         result.set(BucketRegionQueue.super.clearEntries(rvv));
       }
     });
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
     return result.get();
   }
 
@@ -250,7 +252,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     getInitializationLock().writeLock().lock();
     try {
       this.indexes.clear();
-      this.eventSeqNumQueue.clear();
+      this.eventSeqNumDeque.clear();
     } finally {
       getInitializationLock().writeLock().unlock();
     }
@@ -377,7 +379,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       if (logger.isDebugEnabled()) {
         logger.debug(" removing the key {} from eventSeqNumQueue", event.getKey());
       }
-      this.eventSeqNumQueue.remove(event.getKey());
+      this.eventSeqNumDeque.remove(event.getKey());
     }
   }
 
@@ -412,7 +414,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      key = this.eventSeqNumQueue.peek();
+      key = this.eventSeqNumDeque.peekFirst();
       if (key != null) {
         object = optimalGet(key);
         if (object == null && !this.getPartitionedRegion().isConflationEnabled()) {
@@ -431,7 +433,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         // RegionQueue[1])[0];
         // //queue.addToPeekedKeys(key);
         // }
-        this.eventSeqNumQueue.remove(key);
+        this.eventSeqNumDeque.remove(key);
       }
       return object; // OFFHEAP: ok since callers are careful to do destroys on
                      // region queue after finished with peeked object.
@@ -443,7 +445,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
     if (didPut) {
       if (this.initialized) {
-        this.eventSeqNumQueue.add(key);
+        this.eventSeqNumDeque.addLast(key);
         updateLargestQueuedKey((Long) key);
       }
       if (logger.isDebugEnabled()) {
@@ -456,6 +458,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     }
   }
 
+  public void pushKeyIntoQueue(Object key) {
+    eventSeqNumDeque.addFirst(key);
+  }
+
   private void updateLargestQueuedKey(Long key) {
     Atomics.setIfGreater(this.latestQueuedKey, key);
   }
@@ -510,7 +516,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
    * @throws ForceReattemptException
    */
   public Object remove() throws ForceReattemptException {
-    Object key = this.eventSeqNumQueue.remove();
+    Object key = this.eventSeqNumDeque.removeFirst();
     if (key != null) {
       destroyKey(key);
     }
@@ -586,7 +592,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
 
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
-        && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
+        && !this.eventSeqNumDeque.isEmpty() && getBucketAdvisor().isPrimary();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index eb93b76..8ffdf06 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -52,15 +52,15 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
  * a point in time. Currently this is fixed at the time the member using this operation exchanged
  * profiles with other users of the Region, and is useful only for ensuring consistency for
  * InitialImageOperation.
- * 
+ *
  * StateFlushOperation works with distribution advisors and with the membership manager to flush
  * cache operations from threads to communications channels and then from the communications
  * channels to the cache of the member selected to be an initial image provider.
- * 
+ *
  * To make an operation subject to StateFlushOperation you must encapsulate the message part of the
  * operation (prior to asking for distribution advice) in a try/finally block. The try/finally block
  * must work with the distribution manager like this:
- * 
+ *
  * <pre>
  * try {
  *   long version = advisor.startOperation();
@@ -74,22 +74,22 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
  *   }
  * }
  * </pre>
- * 
+ *
  * On the receiving side the messaging system will look at the result of invoking
  * containsCacheContentChange() on the message. If the message does not return true from this
  * message then state-flush will not wait for it to be applied to the cache before GII starts.
- * 
+ *
  * <pre>
  * \@Override
  * public boolean containsCacheContentChange() {
  *   return true;
  * }
  * </pre>
- * 
+ *
  * The messaging infrastructure will handle the rest for you. For examples look at the uses of
  * startOperation() and endOperation(). There are some complex examples in transaction processing
  * and a more straightforward example in DistributedCacheOperation.
- * 
+ *
  * @since GemFire 5.0.1
  */
 public class StateFlushOperation {
@@ -108,7 +108,7 @@ public class StateFlushOperation {
     boolean initialized = r.isInitialized();
     if (initialized) {
       r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so we can track
-                                                              // current ops
+      // current ops
       try {
         r.getDistributionAdvisor().waitForCurrentOperations();
       } catch (RegionDestroyedException e) {
@@ -153,7 +153,7 @@ public class StateFlushOperation {
 
   /**
    * Constructor for StateFlushOperation
-   * 
+   *
    * @param r The region whose state is to be flushed
    */
   public StateFlushOperation(DistributedRegion r) {
@@ -163,7 +163,7 @@ public class StateFlushOperation {
 
   /**
    * Constructor for StateFlushOperation for flushing all regions
-   * 
+   *
    * @param dm the distribution manager to use in distributing the operation
    */
   public StateFlushOperation(DM dm) {
@@ -173,7 +173,7 @@ public class StateFlushOperation {
 
   /**
    * flush state to the given target
-   * 
+   *
    * @param recipients The members who may be making state changes to the region. This is typically
    *        taken from a CacheDistributionAdvisor membership set
    * @param target The member who should have all state flushed to it
@@ -187,7 +187,7 @@ public class StateFlushOperation {
    * @return true if the state was flushed, false if not
    */
   public boolean flush(Set recipients, DistributedMember target, int processorType,
-      boolean flushNewOps) throws InterruptedException {
+                       boolean flushNewOps) throws InterruptedException {
 
     Set recips = recipients; // do not use recipients parameter past this point
     if (Thread.interrupted()) {
@@ -260,16 +260,16 @@ public class StateFlushOperation {
    * sent to all members holding the region, and has the effect of causing those members to send a
    * serial distribution message (a StateStabilizationMessage) to the image provider. The provider
    * then sends a reply message back to this process on behalf of the member receiving the .
-   * 
+   *
    * <pre>
    * requestor ----> member1 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
    *           ----> member2 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
    *           ----> provider --StateStabilizedMessage--> requestor
    * </pre>
-   * 
+   *
    * This flushes the ordered messages in flight between members and the gii provider, so we don't
    * miss data when the image is requested.
-   * 
+   *
    * @since GemFire 5.0.1
    * @see StateFlushOperation.StateStabilizationMessage
    * @see StateFlushOperation.StateStabilizedMessage
@@ -395,8 +395,8 @@ public class StateFlushOperation {
               if (initialized) {
                 if (this.flushNewOps) {
                   r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so
-                                                                          // we can track current
-                                                                          // ops
+                  // we can track current
+                  // ops
                 }
                 try {
                   r.getDistributionAdvisor().waitForCurrentOperations();
@@ -503,7 +503,7 @@ public class StateFlushOperation {
    * StateStabilizationMessage when all state has been flushed to it.
    * <p>
    * author bruce
-   * 
+   *
    * @see StateFlushOperation.StateStabilizedMessage
    * @see StateFlushOperation.StateMarkerMessage
    * @since GemFire 5.0.1
@@ -650,7 +650,7 @@ public class StateFlushOperation {
    * before the initial image is requested.
    * <p>
    * author bruce
-   * 
+   *
    * @see StateFlushOperation.StateMarkerMessage
    * @see StateFlushOperation.StateStabilizationMessage
    * @since GemFire 5.0.1
@@ -740,7 +740,7 @@ public class StateFlushOperation {
       this.targetMember = (InternalDistributedMember) target;
       this.originalCount = initMembers.size();
       this.targetMemberHasLeft = targetMemberHasLeft // bug #43583 - perform an initial membership
-                                                     // check
+          // check
           || !manager.isCurrentMember((InternalDistributedMember) target);
     }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..9b55abb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -1357,6 +1355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         final PartitionedRegion region = (PartitionedRegion) event.getRegion();
         if (!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
           iterator.remove();
+          BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+          if (brq != null) {
+            brq.pushKeyIntoQueue(event.getShadowKey());
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
index bfa30c3..e80b82d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
@@ -771,7 +771,8 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
     // now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6
     waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
     waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
-
+    P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
+    
     // restart and gii, R's rvv should be the same as P's
     checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
     createDistributedRegion(R);