You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/05/03 20:53:23 UTC
[09/10] geode git commit: GEODE-2847: Get correct version tags for
retried bulk operation
GEODE-2847: Get correct version tags for retried bulk operation
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/16832655
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/16832655
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/16832655
Branch: refs/heads/feature/GEM-1353
Commit: 16832655d18592ddaf3c89979be30e5e7caa10f1
Parents: 0dd2552
Author: eshu <es...@pivotal.io>
Authored: Wed May 3 10:14:08 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Wed May 3 10:14:08 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/EventTracker.java | 117 +++++++------------
.../geode/internal/cache/LocalRegion.java | 24 ++--
.../cache/partitioned/PutAllPRMessage.java | 2 +-
.../cache/partitioned/RemoveAllPRMessage.java | 2 +-
.../tier/sockets/ClientProxyMembershipID.java | 2 +-
.../AbstractDistributedRegionJUnitTest.java | 15 ++-
.../cache/DistributedRegionJUnitTest.java | 54 ++++++++-
7 files changed, 120 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
index 2ddfdc4..278367c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
@@ -59,14 +59,12 @@ public class EventTracker {
new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100);
/**
- * a mapping of originator to bulkOp's last status (true means finished processing) applied to
- * this cache.
+ * a mapping of originator to bulkOps
*
- * Keys are instances of @link {@link ThreadIdentifier}, values are instances of
- * {@link BulkOpProcessed}.
+ * Keys are instances of @link {@link ThreadIdentifier}
*/
- private final ConcurrentMap<ThreadIdentifier, BulkOpProcessed> recordedBulkOps =
- new ConcurrentHashMap<ThreadIdentifier, BulkOpProcessed>(100);
+ private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps =
+ new ConcurrentHashMap<ThreadIdentifier, Object>(100);
/**
* a mapping of originator to bulkOperation's last version tags. This map differs from
@@ -141,7 +139,7 @@ public class EventTracker {
public EventTracker(LocalRegion region) {
this.cache = region.cache;
this.name = "Event Tracker for " + region.getName();
- this.initializationLatch = new StoppableCountDownLatch(region.stopper, 1);
+ this.initializationLatch = new StoppableCountDownLatch(region.getStopper(), 1);
}
/** start this event tracker */
@@ -307,19 +305,22 @@ public class EventTracker {
}
}
+ EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
+ if (logger.isTraceEnabled()) {
+ logger.trace("region event tracker recording {}", event);
+ }
+ recordSeqno(membershipID, newEvh);
+
// If this is a bulkOp, and concurrency checks are enabled, we need to
// save the version tag in case we retry.
- if (lr.concurrencyChecksEnabled
+ // Make recordBulkOp version tag after recordSeqno, so that recordBulkOpStart
+ // in a retry bulk op would not incorrectly remove the saved version tag in
+ // recordedBulkOpVersionTags
+ if (lr.getConcurrencyChecksEnabled()
&& (event.getOperation().isPutAll() || event.getOperation().isRemoveAll())
&& lr.getServerProxy() == null) {
recordBulkOpEvent(event, membershipID);
}
-
- EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
- if (logger.isTraceEnabled()) {
- logger.trace("region event tracker recording {}", event);
- }
- recordSeqno(membershipID, newEvh);
}
/**
@@ -542,24 +543,19 @@ public class EventTracker {
ThreadIdentifier membershipID =
new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
- BulkOpProcessed opSyncObj =
- recordedBulkOps.putIfAbsent(membershipID, new BulkOpProcessed(false));
- if (opSyncObj == null) {
- opSyncObj = recordedBulkOps.get(membershipID);
- }
+ Object opSyncObj = null;
+ do {
+ opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object());
+ if (opSyncObj == null) {
+ opSyncObj = recordedBulkOps.get(membershipID);
+ }
+ } while (opSyncObj == null);
+
synchronized (opSyncObj) {
try {
- if (opSyncObj.getStatus() && logger.isDebugEnabled()) {
- logger.debug("SyncBulkOp: The operation was performed by another thread.");
- } else {
- recordBulkOpStart(membershipID);
-
- // Perform the bulk op
- r.run();
- // set to true in case another thread is waiting at sync
- opSyncObj.setStatus(true);
- recordedBulkOps.remove(membershipID);
- }
+ recordBulkOpStart(membershipID, eventID);
+ // Perform the bulk op
+ r.run();
} finally {
recordedBulkOps.remove(membershipID);
}
@@ -567,14 +563,23 @@ public class EventTracker {
}
/**
- * Called when a bulkOp is started on the local region. Used to clear event tracker state from the
- * last bulkOp.
+ * Called when a new bulkOp is started on the local region. Used to clear event tracker state from
+ * the last bulkOp.
*/
- public void recordBulkOpStart(ThreadIdentifier tid) {
+ public void recordBulkOpStart(ThreadIdentifier tid, EventID eventID) {
if (logger.isDebugEnabled()) {
logger.debug("recording bulkOp start for {}", tid.expensiveToString());
}
- this.recordedBulkOpVersionTags.remove(tid);
+ EventSeqnoHolder evh = recordedEvents.get(tid);
+ if (evh == null) {
+ return;
+ }
+ synchronized (evh) {
+ // only remove it when a new bulk op occurs
+ if (eventID.getSequenceID() > evh.lastSeqno) {
+ this.recordedBulkOpVersionTags.remove(tid);
+ }
+ }
}
/**
@@ -660,50 +665,6 @@ public class EventTracker {
}
/**
- * A status tracker for each bulk operation (putAll or removeAll) from originators specified by
- * membershipID and threadID in the cache processed is true means the bulk op is processed by one
- * thread no need to redo it by other threads.
- *
- * @since GemFire 5.7
- */
- static class BulkOpProcessed {
- /** whether the op is processed */
- private boolean processed;
-
- /**
- * creates a new instance to save status of a bulk op
- *
- * @param status true if the op has been processed
- */
- BulkOpProcessed(boolean status) {
- this.processed = status;
- }
-
- /**
- * setter method to change the status
- *
- * @param status true if the op has been processed
- */
- void setStatus(boolean status) {
- this.processed = status;
- }
-
- /**
- * getter method to peek the current status
- *
- * @return current status
- */
- boolean getStatus() {
- return this.processed;
- }
-
- @Override
- public String toString() {
- return "BULKOP(" + this.processed + ")";
- }
- }
-
- /**
* A holder for the version tags generated for a bulk operation (putAll or removeAll). These
* version tags are retrieved when a bulk op is retried.
*
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8c061b0..2dec53b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -505,6 +505,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return new Stopper();
}
+ protected CancelCriterion getStopper() {
+ return this.stopper;
+ }
+
private final TestCallable testCallable;
/**
@@ -682,10 +686,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
/**
- * Test method for getting the event tracker.
*
- * this method is for testing only. Other region classes may track events using different
- * mechanisms than EventTrackers
+ * Other region classes may track events using different mechanisms than EventTrackers
*/
protected EventTracker getEventTracker() {
return this.eventTracker;
@@ -3475,6 +3477,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
+ protected boolean getEnableConcurrencyChecks() {
+ return this.concurrencyChecksEnabled;
+ }
+
/**
* validate attributes of subregion being created, sent to parent
*
@@ -6151,8 +6157,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
isDup = this.eventTracker.hasSeenEvent(event);
if (isDup) {
event.setPossibleDuplicate(true);
- if (this.concurrencyChecksEnabled && event.getVersionTag() == null) {
- event.setVersionTag(findVersionTagForClientEvent(event.getEventId()));
+ if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) {
+ if (event.isBulkOpInProgress()) {
+ event.setVersionTag(findVersionTagForClientBulkOp(event.getEventId()));
+ } else {
+ event.setVersionTag(findVersionTagForClientEvent(event.getEventId()));
+ }
}
} else {
// bug #48205 - a retried PR operation may already have a version assigned to it
@@ -6253,9 +6263,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- public void recordBulkOpStart(ThreadIdentifier membershipID) {
+ public void recordBulkOpStart(ThreadIdentifier membershipID, EventID eventID) {
if (this.eventTracker != null && !isTX()) {
- this.eventTracker.recordBulkOpStart(membershipID);
+ this.eventTracker.recordBulkOpStart(membershipID, eventID);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 27f5aa0..ed1fe0a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -438,7 +438,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply {
EventID eventID = putAllPRData[0].getEventID();
ThreadIdentifier membershipID =
new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
- bucketRegion.recordBulkOpStart(membershipID);
+ bucketRegion.recordBulkOpStart(membershipID, eventID);
}
bucketRegion.waitUntilLocked(keys);
boolean lockedForPrimary = false;
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index f4f6299..0e38ddc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -434,7 +434,7 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
EventID eventID = removeAllPRData[0].getEventID();
ThreadIdentifier membershipID =
new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
- bucketRegion.recordBulkOpStart(membershipID);
+ bucketRegion.recordBulkOpStart(membershipID, eventID);
}
bucketRegion.waitUntilLocked(keys);
boolean lockedForPrimary = false;
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..2fd508b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -38,7 +38,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
*
*
*/
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
implements DataSerializableFixedID, Serializable, Externalizable {
private static final Logger logger = LogService.getLogger();
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
index ba2f794..a8cbdde 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
@@ -106,7 +106,8 @@ public abstract class AbstractDistributedRegionJUnitTest {
protected abstract void verifyDistributeUpdateEntryVersion(DistributedRegion region,
EntryEventImpl event, int cnt);
- protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) {
+ protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled,
+ boolean testHasSeenEvent) {
GemFireCacheImpl cache = Fakes.cache();
// create region attributes and internal region arguments
@@ -122,14 +123,16 @@ public abstract class AbstractDistributedRegionJUnitTest {
}
doNothing().when(region).notifyGatewaySender(any(), any());
- doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+ if (!testHasSeenEvent) {
+ doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+ }
return region;
}
@Test
public void testConcurrencyFalseTagNull() {
// case 1: concurrencyCheckEanbled = false, version tag is null: distribute
- DistributedRegion region = prepare(false);
+ DistributedRegion region = prepare(false, false);
EntryEventImpl event = createDummyEvent(region);
assertNull(event.getVersionTag());
doTest(region, event, 1);
@@ -138,7 +141,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
@Test
public void testConcurrencyTrueTagNull() {
// case 2: concurrencyCheckEanbled = true, version tag is null: not to distribute
- DistributedRegion region = prepare(true);
+ DistributedRegion region = prepare(true, false);
EntryEventImpl event = createDummyEvent(region);
assertNull(event.getVersionTag());
doTest(region, event, 0);
@@ -147,7 +150,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
@Test
public void testConcurrencyTrueTagInvalid() {
// case 3: concurrencyCheckEanbled = true, version tag is invalid: not to distribute
- DistributedRegion region = prepare(true);
+ DistributedRegion region = prepare(true, false);
EntryEventImpl event = createDummyEvent(region);
VersionTag tag = createVersionTag(false);
event.setVersionTag(tag);
@@ -158,7 +161,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
@Test
public void testConcurrencyTrueTagValid() {
// case 4: concurrencyCheckEanbled = true, version tag is valid: distribute
- DistributedRegion region = prepare(true);
+ DistributedRegion region = prepare(true, false);
EntryEventImpl event = createDummyEvent(region);
VersionTag tag = createVersionTag(true);
event.setVersionTag(tag);
http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 7525f35..ce21c67 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,15 +14,23 @@
*/
package org.apache.geode.internal.cache;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.*;
-import org.junit.experimental.categories.Category;
+import java.util.concurrent.ConcurrentMap;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.EventTracker.BulkOpHolder;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -99,5 +107,47 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
}
}
+ @Test
+ public void retriedBulkOpGetsSavedVersionTag() {
+ DistributedRegion region = prepare(true, true);
+ DistributedMember member = mock(DistributedMember.class);
+ ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class);
+ doReturn(false).when(region).isUsedForPartitionedRegionBucket();
+
+ byte[] memId = {1, 2, 3};
+ long threadId = 1;
+ long retrySeqId = 1;
+ ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
+ EventID retryEventID = new EventID(memId, threadId, retrySeqId);
+ boolean skipCallbacks = true;
+ int size = 2;
+ recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId, size);
+ EventTracker eventTracker = region.getEventTracker();
+
+ ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags();
+ BulkOpHolder holder = map.get(tid);
+
+ EntryEventImpl retryEvent = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key1",
+ "value1", null, false, member, !skipCallbacks, retryEventID);
+ retryEvent.setContext(memberId);
+ retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class));
+
+ region.hasSeenEvent(retryEvent);
+ assertTrue(retryEvent.getVersionTag().equals(holder.entryVersionTags.get(retryEventID)));
+ }
+
+ protected void recordPutAllEvents(DistributedRegion region, byte[] memId, long threadId,
+ boolean skipCallbacks, DistributedMember member, ClientProxyMembershipID memberId, int size) {
+ EntryEventImpl[] events = new EntryEventImpl[size];
+ EventTracker eventTracker = region.getEventTracker();
+ for (int i = 0; i < size; i++) {
+ events[i] = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i, "value" + i,
+ null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1));
+ events[i].setContext(memberId);
+ events[i].setVersionTag(mock(VersionTag.class));
+ eventTracker.recordEvent(events[i]);
+ }
+ }
+
}