You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/05/03 20:53:23 UTC

[09/10] geode git commit: GEODE-2847: Get correct version tags for retried bulk operation

GEODE-2847: Get correct version tags for retried bulk operation


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/16832655
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/16832655
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/16832655

Branch: refs/heads/feature/GEM-1353
Commit: 16832655d18592ddaf3c89979be30e5e7caa10f1
Parents: 0dd2552
Author: eshu <es...@pivotal.io>
Authored: Wed May 3 10:14:08 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Wed May 3 10:14:08 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/EventTracker.java      | 117 +++++++------------
 .../geode/internal/cache/LocalRegion.java       |  24 ++--
 .../cache/partitioned/PutAllPRMessage.java      |   2 +-
 .../cache/partitioned/RemoveAllPRMessage.java   |   2 +-
 .../tier/sockets/ClientProxyMembershipID.java   |   2 +-
 .../AbstractDistributedRegionJUnitTest.java     |  15 ++-
 .../cache/DistributedRegionJUnitTest.java       |  54 ++++++++-
 7 files changed, 120 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
index 2ddfdc4..278367c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
@@ -59,14 +59,12 @@ public class EventTracker {
       new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100);
 
   /**
-   * a mapping of originator to bulkOp's last status (true means finished processing) applied to
-   * this cache.
+   * a mapping of originator to bulkOps
    *
-   * Keys are instances of @link {@link ThreadIdentifier}, values are instances of
-   * {@link BulkOpProcessed}.
+   * Keys are instances of @link {@link ThreadIdentifier}
    */
-  private final ConcurrentMap<ThreadIdentifier, BulkOpProcessed> recordedBulkOps =
-      new ConcurrentHashMap<ThreadIdentifier, BulkOpProcessed>(100);
+  private final ConcurrentMap<ThreadIdentifier, Object> recordedBulkOps =
+      new ConcurrentHashMap<ThreadIdentifier, Object>(100);
 
   /**
    * a mapping of originator to bulkOperation's last version tags. This map differs from
@@ -141,7 +139,7 @@ public class EventTracker {
   public EventTracker(LocalRegion region) {
     this.cache = region.cache;
     this.name = "Event Tracker for " + region.getName();
-    this.initializationLatch = new StoppableCountDownLatch(region.stopper, 1);
+    this.initializationLatch = new StoppableCountDownLatch(region.getStopper(), 1);
   }
 
   /** start this event tracker */
@@ -307,19 +305,22 @@ public class EventTracker {
       }
     }
 
+    EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
+    if (logger.isTraceEnabled()) {
+      logger.trace("region event tracker recording {}", event);
+    }
+    recordSeqno(membershipID, newEvh);
+
     // If this is a bulkOp, and concurrency checks are enabled, we need to
     // save the version tag in case we retry.
-    if (lr.concurrencyChecksEnabled
+    // Make recordBulkOp version tag after recordSeqno, so that recordBulkOpStart
+    // in a retry bulk op would not incorrectly remove the saved version tag in
+    // recordedBulkOpVersionTags
+    if (lr.getConcurrencyChecksEnabled()
         && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll())
         && lr.getServerProxy() == null) {
       recordBulkOpEvent(event, membershipID);
     }
-
-    EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
-    if (logger.isTraceEnabled()) {
-      logger.trace("region event tracker recording {}", event);
-    }
-    recordSeqno(membershipID, newEvh);
   }
 
   /**
@@ -542,24 +543,19 @@ public class EventTracker {
     ThreadIdentifier membershipID =
         new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
 
-    BulkOpProcessed opSyncObj =
-        recordedBulkOps.putIfAbsent(membershipID, new BulkOpProcessed(false));
-    if (opSyncObj == null) {
-      opSyncObj = recordedBulkOps.get(membershipID);
-    }
+    Object opSyncObj = null;
+    do {
+      opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new Object());
+      if (opSyncObj == null) {
+        opSyncObj = recordedBulkOps.get(membershipID);
+      }
+    } while (opSyncObj == null);
+
     synchronized (opSyncObj) {
       try {
-        if (opSyncObj.getStatus() && logger.isDebugEnabled()) {
-          logger.debug("SyncBulkOp: The operation was performed by another thread.");
-        } else {
-          recordBulkOpStart(membershipID);
-
-          // Perform the bulk op
-          r.run();
-          // set to true in case another thread is waiting at sync
-          opSyncObj.setStatus(true);
-          recordedBulkOps.remove(membershipID);
-        }
+        recordBulkOpStart(membershipID, eventID);
+        // Perform the bulk op
+        r.run();
       } finally {
         recordedBulkOps.remove(membershipID);
       }
@@ -567,14 +563,23 @@ public class EventTracker {
   }
 
   /**
-   * Called when a bulkOp is started on the local region. Used to clear event tracker state from the
-   * last bulkOp.
+   * Called when a new bulkOp is started on the local region. Used to clear event tracker state from
+   * the last bulkOp.
    */
-  public void recordBulkOpStart(ThreadIdentifier tid) {
+  public void recordBulkOpStart(ThreadIdentifier tid, EventID eventID) {
     if (logger.isDebugEnabled()) {
       logger.debug("recording bulkOp start for {}", tid.expensiveToString());
     }
-    this.recordedBulkOpVersionTags.remove(tid);
+    EventSeqnoHolder evh = recordedEvents.get(tid);
+    if (evh == null) {
+      return;
+    }
+    synchronized (evh) {
+      // only remove it when a new bulk op occurs
+      if (eventID.getSequenceID() > evh.lastSeqno) {
+        this.recordedBulkOpVersionTags.remove(tid);
+      }
+    }
   }
 
   /**
@@ -660,50 +665,6 @@ public class EventTracker {
   }
 
   /**
-   * A status tracker for each bulk operation (putAll or removeAll) from originators specified by
-   * membershipID and threadID in the cache processed is true means the bulk op is processed by one
-   * thread no need to redo it by other threads.
-   * 
-   * @since GemFire 5.7
-   */
-  static class BulkOpProcessed {
-    /** whether the op is processed */
-    private boolean processed;
-
-    /**
-     * creates a new instance to save status of a bulk op
-     * 
-     * @param status true if the op has been processed
-     */
-    BulkOpProcessed(boolean status) {
-      this.processed = status;
-    }
-
-    /**
-     * setter method to change the status
-     * 
-     * @param status true if the op has been processed
-     */
-    void setStatus(boolean status) {
-      this.processed = status;
-    }
-
-    /**
-     * getter method to peek the current status
-     * 
-     * @return current status
-     */
-    boolean getStatus() {
-      return this.processed;
-    }
-
-    @Override
-    public String toString() {
-      return "BULKOP(" + this.processed + ")";
-    }
-  }
-
-  /**
    * A holder for the version tags generated for a bulk operation (putAll or removeAll). These
    * version tags are retrieved when a bulk op is retried.
    * 

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8c061b0..2dec53b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -505,6 +505,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return new Stopper();
   }
 
+  protected CancelCriterion getStopper() {
+    return this.stopper;
+  }
+
   private final TestCallable testCallable;
 
   /**
@@ -682,10 +686,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
 
 
   /**
-   * Test method for getting the event tracker.
    * 
-   * this method is for testing only. Other region classes may track events using different
-   * mechanisms than EventTrackers
+   * Other region classes may track events using different mechanisms than EventTrackers
    */
   protected EventTracker getEventTracker() {
     return this.eventTracker;
@@ -3475,6 +3477,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
+  protected boolean getEnableConcurrencyChecks() {
+    return this.concurrencyChecksEnabled;
+  }
+
   /**
    * validate attributes of subregion being created, sent to parent
    *
@@ -6151,8 +6157,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       isDup = this.eventTracker.hasSeenEvent(event);
       if (isDup) {
         event.setPossibleDuplicate(true);
-        if (this.concurrencyChecksEnabled && event.getVersionTag() == null) {
-          event.setVersionTag(findVersionTagForClientEvent(event.getEventId()));
+        if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) {
+          if (event.isBulkOpInProgress()) {
+            event.setVersionTag(findVersionTagForClientBulkOp(event.getEventId()));
+          } else {
+            event.setVersionTag(findVersionTagForClientEvent(event.getEventId()));
+          }
         }
       } else {
         // bug #48205 - a retried PR operation may already have a version assigned to it
@@ -6253,9 +6263,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  public void recordBulkOpStart(ThreadIdentifier membershipID) {
+  public void recordBulkOpStart(ThreadIdentifier membershipID, EventID eventID) {
     if (this.eventTracker != null && !isTX()) {
-      this.eventTracker.recordBulkOpStart(membershipID);
+      this.eventTracker.recordBulkOpStart(membershipID, eventID);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 27f5aa0..ed1fe0a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -438,7 +438,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply {
             EventID eventID = putAllPRData[0].getEventID();
             ThreadIdentifier membershipID =
                 new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-            bucketRegion.recordBulkOpStart(membershipID);
+            bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           bucketRegion.waitUntilLocked(keys);
           boolean lockedForPrimary = false;

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index f4f6299..0e38ddc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -434,7 +434,7 @@ public final class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
             EventID eventID = removeAllPRData[0].getEventID();
             ThreadIdentifier membershipID =
                 new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
-            bucketRegion.recordBulkOpStart(membershipID);
+            bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           bucketRegion.waitUntilLocked(keys);
           boolean lockedForPrimary = false;

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..2fd508b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -38,7 +38,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
  * 
  * 
  */
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
     implements DataSerializableFixedID, Serializable, Externalizable {
 
   private static final Logger logger = LogService.getLogger();

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
index ba2f794..a8cbdde 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractDistributedRegionJUnitTest.java
@@ -106,7 +106,8 @@ public abstract class AbstractDistributedRegionJUnitTest {
   protected abstract void verifyDistributeUpdateEntryVersion(DistributedRegion region,
       EntryEventImpl event, int cnt);
 
-  protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) {
+  protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled,
+      boolean testHasSeenEvent) {
     GemFireCacheImpl cache = Fakes.cache();
 
     // create region attributes and internal region arguments
@@ -122,14 +123,16 @@ public abstract class AbstractDistributedRegionJUnitTest {
     }
 
     doNothing().when(region).notifyGatewaySender(any(), any());
-    doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+    if (!testHasSeenEvent) {
+      doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+    }
     return region;
   }
 
   @Test
   public void testConcurrencyFalseTagNull() {
     // case 1: concurrencyCheckEanbled = false, version tag is null: distribute
-    DistributedRegion region = prepare(false);
+    DistributedRegion region = prepare(false, false);
     EntryEventImpl event = createDummyEvent(region);
     assertNull(event.getVersionTag());
     doTest(region, event, 1);
@@ -138,7 +141,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
   @Test
   public void testConcurrencyTrueTagNull() {
     // case 2: concurrencyCheckEanbled = true, version tag is null: not to distribute
-    DistributedRegion region = prepare(true);
+    DistributedRegion region = prepare(true, false);
     EntryEventImpl event = createDummyEvent(region);
     assertNull(event.getVersionTag());
     doTest(region, event, 0);
@@ -147,7 +150,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
   @Test
   public void testConcurrencyTrueTagInvalid() {
     // case 3: concurrencyCheckEanbled = true, version tag is invalid: not to distribute
-    DistributedRegion region = prepare(true);
+    DistributedRegion region = prepare(true, false);
     EntryEventImpl event = createDummyEvent(region);
     VersionTag tag = createVersionTag(false);
     event.setVersionTag(tag);
@@ -158,7 +161,7 @@ public abstract class AbstractDistributedRegionJUnitTest {
   @Test
   public void testConcurrencyTrueTagValid() {
     // case 4: concurrencyCheckEanbled = true, version tag is valid: distribute
-    DistributedRegion region = prepare(true);
+    DistributedRegion region = prepare(true, false);
     EntryEventImpl event = createDummyEvent(region);
     VersionTag tag = createVersionTag(true);
     event.setVersionTag(tag);

http://git-wip-us.apache.org/repos/asf/geode/blob/16832655/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 7525f35..ce21c67 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,15 +14,23 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.*;
 
-import org.junit.experimental.categories.Category;
+import java.util.concurrent.ConcurrentMap;
 
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.EventTracker.BulkOpHolder;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -99,5 +107,47 @@ public class DistributedRegionJUnitTest extends AbstractDistributedRegionJUnitTe
     }
   }
 
+  @Test
+  public void retriedBulkOpGetsSavedVersionTag() {
+    DistributedRegion region = prepare(true, true);
+    DistributedMember member = mock(DistributedMember.class);
+    ClientProxyMembershipID memberId = mock(ClientProxyMembershipID.class);
+    doReturn(false).when(region).isUsedForPartitionedRegionBucket();
+
+    byte[] memId = {1, 2, 3};
+    long threadId = 1;
+    long retrySeqId = 1;
+    ThreadIdentifier tid = new ThreadIdentifier(memId, threadId);
+    EventID retryEventID = new EventID(memId, threadId, retrySeqId);
+    boolean skipCallbacks = true;
+    int size = 2;
+    recordPutAllEvents(region, memId, threadId, skipCallbacks, member, memberId, size);
+    EventTracker eventTracker = region.getEventTracker();
+
+    ConcurrentMap<ThreadIdentifier, BulkOpHolder> map = eventTracker.getRecordedBulkOpVersionTags();
+    BulkOpHolder holder = map.get(tid);
+
+    EntryEventImpl retryEvent = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key1",
+        "value1", null, false, member, !skipCallbacks, retryEventID);
+    retryEvent.setContext(memberId);
+    retryEvent.setPutAllOperation(mock(DistributedPutAllOperation.class));
+
+    region.hasSeenEvent(retryEvent);
+    assertTrue(retryEvent.getVersionTag().equals(holder.entryVersionTags.get(retryEventID)));
+  }
+
+  protected void recordPutAllEvents(DistributedRegion region, byte[] memId, long threadId,
+      boolean skipCallbacks, DistributedMember member, ClientProxyMembershipID memberId, int size) {
+    EntryEventImpl[] events = new EntryEventImpl[size];
+    EventTracker eventTracker = region.getEventTracker();
+    for (int i = 0; i < size; i++) {
+      events[i] = EntryEventImpl.create(region, Operation.PUTALL_CREATE, "key" + i, "value" + i,
+          null, false, member, !skipCallbacks, new EventID(memId, threadId, i + 1));
+      events[i].setContext(memberId);
+      events[i].setVersionTag(mock(VersionTag.class));
+      eventTracker.recordEvent(events[i]);
+    }
+  }
+
 }