You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2017/05/15 19:03:41 UTC
[2/2] geode git commit: GEODE-2900: push shadow key back into the
front of the eventSeqNumber "Queue"
GEODE-2900: push shadow key back into the front of the eventSeqNumber "Queue"
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/161a9a08
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/161a9a08
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/161a9a08
Branch: refs/heads/develop
Commit: 161a9a085a6b354e4e64b275346a680b7addfa38
Parents: 9a1aedd
Author: Jason Huynh <hu...@gmail.com>
Authored: Fri May 12 16:50:02 2017 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon May 15 12:03:24 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegionQueue.java | 38 +++++++++++--------
.../internal/cache/StateFlushOperation.java | 40 ++++++++++----------
.../parallel/ParallelGatewaySenderQueue.java | 6 ++-
.../geode/internal/cache/GIIDeltaDUnitTest.java | 3 +-
4 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..e9a74e7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -27,8 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,10 +73,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
private final Map indexes;
/**
- * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote site.
- * It is cleared when the queue is cleared.
+ * A transient deque, but should be treated like as a fifo queue to maintain the eventSeqNum of
+ * the events that are to be sent to remote site. It is cleared when the queue is cleared.
*/
- private final BlockingQueue<Object> eventSeqNumQueue = new LinkedBlockingQueue<Object>();
+ private final BlockingDeque<Object> eventSeqNumDeque = new LinkedBlockingDeque<Object>();
// private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new
// LinkedBlockingQueue<EventID>();
@@ -139,7 +141,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
});
for (EventID eventID : keys) {
- eventSeqNumQueue.add(eventID);
+ eventSeqNumDeque.addLast(eventID);
}
} else {
TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
@@ -150,7 +152,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
// fix for #49679 NoSuchElementException thrown from BucketRegionQueue.initialize
if (!sortedKeys.isEmpty()) {
for (Long key : sortedKeys) {
- eventSeqNumQueue.add(key);
+ eventSeqNumDeque.addLast(key);
}
lastKeyRecovered = sortedKeys.last();
if (this.getEventSeqNum() != null) {
@@ -162,7 +164,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(
"For bucket {} ,total keys recovered are : {} last key recovered is : {} and the seqNo is ",
- getId(), eventSeqNumQueue.size(), lastKeyRecovered, getEventSeqNum());
+ getId(), eventSeqNumDeque.size(), lastKeyRecovered, getEventSeqNum());
}
}
this.initialized = true;
@@ -211,7 +213,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
@Override
public void beforeAcquiringPrimaryState() {
int batchSize = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
- Iterator<Object> itr = eventSeqNumQueue.iterator();
+ Iterator<Object> itr = eventSeqNumDeque.iterator();
markEventsAsDuplicate(batchSize, itr);
}
@@ -224,7 +226,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
});
this.indexes.clear();
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
}
@Override
@@ -236,7 +238,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
result.set(BucketRegionQueue.super.clearEntries(rvv));
}
});
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
return result.get();
}
@@ -250,7 +252,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
getInitializationLock().writeLock().lock();
try {
this.indexes.clear();
- this.eventSeqNumQueue.clear();
+ this.eventSeqNumDeque.clear();
} finally {
getInitializationLock().writeLock().unlock();
}
@@ -377,7 +379,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(" removing the key {} from eventSeqNumQueue", event.getKey());
}
- this.eventSeqNumQueue.remove(event.getKey());
+ this.eventSeqNumDeque.remove(event.getKey());
}
}
@@ -412,7 +414,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
if (this.getPartitionedRegion().isDestroyed()) {
throw new BucketRegionQueueUnavailableException();
}
- key = this.eventSeqNumQueue.peek();
+ key = this.eventSeqNumDeque.peekFirst();
if (key != null) {
object = optimalGet(key);
if (object == null && !this.getPartitionedRegion().isConflationEnabled()) {
@@ -431,7 +433,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
// RegionQueue[1])[0];
// //queue.addToPeekedKeys(key);
// }
- this.eventSeqNumQueue.remove(key);
+ this.eventSeqNumDeque.remove(key);
}
return object; // OFFHEAP: ok since callers are careful to do destroys on
// region queue after finished with peeked object.
@@ -443,7 +445,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
if (didPut) {
if (this.initialized) {
- this.eventSeqNumQueue.add(key);
+ this.eventSeqNumDeque.addLast(key);
updateLargestQueuedKey((Long) key);
}
if (logger.isDebugEnabled()) {
@@ -456,6 +458,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
}
+ public void pushKeyIntoQueue(Object key) {
+ eventSeqNumDeque.addFirst(key);
+ }
+
private void updateLargestQueuedKey(Long key) {
Atomics.setIfGreater(this.latestQueuedKey, key);
}
@@ -510,7 +516,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
* @throws ForceReattemptException
*/
public Object remove() throws ForceReattemptException {
- Object key = this.eventSeqNumQueue.remove();
+ Object key = this.eventSeqNumDeque.removeFirst();
if (key != null) {
destroyKey(key);
}
@@ -586,7 +592,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
public boolean isReadyForPeek() {
return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
- && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
+ && !this.eventSeqNumDeque.isEmpty() && getBucketAdvisor().isPrimary();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index eb93b76..8ffdf06 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -52,15 +52,15 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
* a point in time. Currently this is fixed at the time the member using this operation exchanged
* profiles with other users of the Region, and is useful only for ensuring consistency for
* InitialImageOperation.
- *
+ *
* StateFlushOperation works with distribution advisors and with the membership manager to flush
* cache operations from threads to communications channels and then from the communications
* channels to the cache of the member selected to be an initial image provider.
- *
+ *
* To make an operation subject to StateFlushOperation you must encapsulate the message part of the
* operation (prior to asking for distribution advice) in a try/finally block. The try/finally block
* must work with the distribution manager like this:
- *
+ *
* <pre>
* try {
* long version = advisor.startOperation();
@@ -74,22 +74,22 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
* }
* }
* </pre>
- *
+ *
* On the receiving side the messaging system will look at the result of invoking
* containsCacheContentChange() on the message. If the message does not return true from this
* message then state-flush will not wait for it to be applied to the cache before GII starts.
- *
+ *
* <pre>
* \@Override
* public boolean containsCacheContentChange() {
* return true;
* }
* </pre>
- *
+ *
* The messaging infrastructure will handle the rest for you. For examples look at the uses of
* startOperation() and endOperation(). There are some complex examples in transaction processing
* and a more straightforward example in DistributedCacheOperation.
- *
+ *
* @since GemFire 5.0.1
*/
public class StateFlushOperation {
@@ -108,7 +108,7 @@ public class StateFlushOperation {
boolean initialized = r.isInitialized();
if (initialized) {
r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so we can track
- // current ops
+ // current ops
try {
r.getDistributionAdvisor().waitForCurrentOperations();
} catch (RegionDestroyedException e) {
@@ -153,7 +153,7 @@ public class StateFlushOperation {
/**
* Constructor for StateFlushOperation
- *
+ *
* @param r The region whose state is to be flushed
*/
public StateFlushOperation(DistributedRegion r) {
@@ -163,7 +163,7 @@ public class StateFlushOperation {
/**
* Constructor for StateFlushOperation for flushing all regions
- *
+ *
* @param dm the distribution manager to use in distributing the operation
*/
public StateFlushOperation(DM dm) {
@@ -173,7 +173,7 @@ public class StateFlushOperation {
/**
* flush state to the given target
- *
+ *
* @param recipients The members who may be making state changes to the region. This is typically
* taken from a CacheDistributionAdvisor membership set
* @param target The member who should have all state flushed to it
@@ -187,7 +187,7 @@ public class StateFlushOperation {
* @return true if the state was flushed, false if not
*/
public boolean flush(Set recipients, DistributedMember target, int processorType,
- boolean flushNewOps) throws InterruptedException {
+ boolean flushNewOps) throws InterruptedException {
Set recips = recipients; // do not use recipients parameter past this point
if (Thread.interrupted()) {
@@ -260,16 +260,16 @@ public class StateFlushOperation {
* sent to all members holding the region, and has the effect of causing those members to send a
* serial distribution message (a StateStabilizationMessage) to the image provider. The provider
* then sends a reply message back to this process on behalf of the member receiving the .
- *
+ *
* <pre>
* requestor ----> member1 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
* ----> member2 --StateStabilizationMessage--> provider --StateStabilizedMessage--> requestor
* ----> provider --StateStabilizedMessage--> requestor
* </pre>
- *
+ *
* This flushes the ordered messages in flight between members and the gii provider, so we don't
* miss data when the image is requested.
- *
+ *
* @since GemFire 5.0.1
* @see StateFlushOperation.StateStabilizationMessage
* @see StateFlushOperation.StateStabilizedMessage
@@ -395,8 +395,8 @@ public class StateFlushOperation {
if (initialized) {
if (this.flushNewOps) {
r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so
- // we can track current
- // ops
+ // we can track current
+ // ops
}
try {
r.getDistributionAdvisor().waitForCurrentOperations();
@@ -503,7 +503,7 @@ public class StateFlushOperation {
* StateStabilizationMessage when all state has been flushed to it.
* <p>
* author bruce
- *
+ *
* @see StateFlushOperation.StateStabilizedMessage
* @see StateFlushOperation.StateMarkerMessage
* @since GemFire 5.0.1
@@ -650,7 +650,7 @@ public class StateFlushOperation {
* before the initial image is requested.
* <p>
* author bruce
- *
+ *
* @see StateFlushOperation.StateMarkerMessage
* @see StateFlushOperation.StateStabilizationMessage
* @since GemFire 5.0.1
@@ -740,7 +740,7 @@ public class StateFlushOperation {
this.targetMember = (InternalDistributedMember) target;
this.originalCount = initMembers.size();
this.targetMemberHasLeft = targetMemberHasLeft // bug #43583 - perform an initial membership
- // check
+ // check
|| !manager.isCurrentMember((InternalDistributedMember) target);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..9b55abb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
@@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -1357,6 +1355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
final PartitionedRegion region = (PartitionedRegion) event.getRegion();
if (!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
iterator.remove();
+ BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+ if (brq != null) {
+ brq.pushKeyIntoQueue(event.getShadowKey());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/161a9a08/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
index bfa30c3..e80b82d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
@@ -771,7 +771,8 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
// now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6
waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
-
+ P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
+
// restart and gii, R's rvv should be the same as P's
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
createDistributedRegion(R);