You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/04 23:25:37 UTC

[09/27] geode git commit: GEODE-3380: There're 2 problems here 1) when removeAll is retried, it will get EntryNotFound exception. It should still put the remove event into the AEQ. 2) An old bug fix in 8.2 was not merged into develop: when removeAll enco

GEODE-3380: There're 2 problems here
1) when removeAll is retried, it will get EntryNotFound exception. It should still put the remove event
into the AEQ.
2) An old bug fix in 8.2 was not merged into develop: when removeAll encounter EntryNotFound exception, should
return version tag of the tombstone.

This closes #674


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/71dc0f86
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/71dc0f86
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/71dc0f86

Branch: refs/heads/feature/GEODE-3299
Commit: 71dc0f865668974668077fe5d3289587944e0b34
Parents: d91096c
Author: zhouxh <gz...@pivotal.io>
Authored: Mon Jul 31 17:55:09 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Wed Aug 2 10:34:09 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/AbstractRegionMap.java | 13 +++
 .../cache/DistributedPutAllOperation.java       |  2 +-
 .../cache/DistributedRemoveAllOperation.java    |  2 +-
 .../cache/partitioned/RemoveAllPRMessage.java   |  8 ++
 .../geode/internal/cache/PutAllCSDUnitTest.java | 90 +++++++++++++++++++-
 5 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index fd5a430..40c8b07 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1359,6 +1359,10 @@ public abstract class AbstractRegionMap implements RegionMap {
                           owner.basicDestroyPart2(tombstone, event, inTokenMode,
                               true /* conflict with clear */, duringRI, true);
                           opCompleted = true;
+                        } else {
+                          Assert.assertTrue(event.getVersionTag() == null);
+                          Assert.assertTrue(newRe == tombstone);
+                          event.setVersionTag(getVersionTagFromStamp(tombstone.getVersionStamp()));
                         }
                       } catch (ConcurrentCacheModificationException ccme) {
                         VersionTag tag = event.getVersionTag();
@@ -1564,6 +1568,15 @@ public abstract class AbstractRegionMap implements RegionMap {
     return false;
   }
 
+  private VersionTag getVersionTagFromStamp(VersionStamp stamp) {
+    VersionTag tag = VersionTag.create(stamp.getMemberID());
+    tag.setEntryVersion(stamp.getEntryVersion());
+    tag.setRegionVersion(stamp.getRegionVersion());
+    tag.setVersionTimeStamp(stamp.getVersionTimeStamp());
+    tag.setDistributedSystemId(stamp.getDistributedSystemId());
+    return tag;
+  }
+
   public void txApplyDestroy(Object key, TransactionId txId, TXRmtEvent txEvent,
       boolean inTokenMode, boolean inRI, Operation op, EventID eventId, Object aCallbackArgument,
       List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 4dcb0b7..37a6703 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -1098,7 +1098,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
       try {
         super.basicOperateOnRegion(ev, rgn);
       } finally {
-        if (ev.getVersionTag() != null && !ev.getVersionTag().isRecorded()) {
+        if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
           if (rgn.getVersionVector() != null) {
             rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index a4661b6..8ea4f97 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -891,7 +891,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
         dispatchElidedEvent(rgn, ev);
         this.appliedOperation = false;
       } finally {
-        if (ev.getVersionTag() != null && !ev.getVersionTag().isRecorded()) {
+        if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
           if (rgn.getVersionVector() != null) {
             rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 1898461..ebfc5ed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -469,6 +469,14 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
                   }
                 } catch (EntryNotFoundException ignore) {
                   didRemove = true;
+                  if (ev.isPossibleDuplicate() && ev.hasValidVersionTag()) {
+                    op.addEntry(ev);
+                    if (logger.isDebugEnabled()) {
+                      logger.debug(
+                          "RemoveAllPRMessage.doLocalRemoveAll:notify client and gateway for not-found-entry:"
+                              + ev);
+                    }
+                  }
                   if (ev.getVersionTag() == null) {
                     if (logger.isDebugEnabled()) {
                       logger.debug(

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
index 2c6252b..d1c0004 100755
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -77,6 +78,7 @@ import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.cache.util.CacheWriterAdapter;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.cache30.ClientServerTestCase;
+import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.test.dunit.Assert;
@@ -500,6 +502,81 @@ public class PutAllCSDUnitTest extends ClientServerTestCase {
   }
 
   /**
+   * Create PR without redundancy on 2 servers with lucene index. Feed some key s. From a client, do
+   * removeAll on keys in server1. During the removeAll, restart server1 and trigger the removeAll
+   * to retry. The retried removeAll should return the version tag of tombstones. Do removeAll again
+   * on the same key, it should get the version tag again.
+   */
+  @Test
+  public void shouldReturnVersionTagOfTombstoneVersionWhenRemoveAllRetried() throws CacheException, InterruptedException {
+    final String title = "test51871:";
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+
+    final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
+
+    // set notifyBySubscription=false to test local-invalidates
+    final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 0, "ds1");
+    createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, true);
+
+    client1.invoke(new CacheSerializableRunnable(title + "client1 add listener and putAll") {
+      @Override
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title + "verify Bridge Server 1") {
+      @Override
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });
+
+    @SuppressWarnings("unchecked")
+    VersionedObjectList versions =
+        (VersionedObjectList) client1.invoke(new SerializableCallable(title + "client1 removeAll") {
+          @Override
+          public Object call() throws CacheException {
+            Region region = getRootRegion().getSubregion(regionName);
+            VersionedObjectList versions = doRemoveAll(regionName, "key-", numberOfEntries);
+            assertEquals(0, region.size());
+            return versions;
+          }
+        });
+
+    @SuppressWarnings("unchecked")
+    VersionedObjectList versionsAfterRetry = (VersionedObjectList) client1
+        .invoke(new SerializableCallable(title + "client1 removeAll again") {
+          @Override
+          public Object call() throws CacheException {
+            Region region = getRootRegion().getSubregion(regionName);
+            VersionedObjectList versions = doRemoveAll(regionName, "key-", numberOfEntries);
+            assertEquals(0, region.size());
+            return versions;
+          }
+        });
+
+    LogWriterUtils.getLogWriter().info("Version tags are:" + versions.getVersionTags() + ":"
+        + versionsAfterRetry.getVersionTags());
+    assertEquals(versionsAfterRetry.getVersionTags(), versions.getVersionTags());
+
+    // clean up
+    // Stop serverß
+    stopBridgeServers(getCache());
+  }
+
+  /**
    * Tests putAll and removeAll to 2 servers. Use Case: 1) putAll from a single-threaded client to a
    * replicated region 2) putAll from a multi-threaded client to a replicated region 3)
    */
@@ -4092,14 +4169,21 @@ public class PutAllCSDUnitTest extends ClientServerTestCase {
     return region;
   }
 
-  protected Region doRemoveAll(String regionName, String keyStub, int numEntries) {
+  protected VersionedObjectList doRemoveAll(String regionName, String keyStub, int numEntries) {
     Region region = getRootRegion().getSubregion(regionName);
     ArrayList<String> keys = new ArrayList<String>();
     for (int i = 0; i < numEntries; i++) {
       keys.add(keyStub + i);
     }
-    region.removeAll(keys, "removeAllCallback");
-    return region;
+    // region.removeAll(keys, "removeAllCallback");
+    LocalRegion lr = (LocalRegion) region;
+    final EntryEventImpl event = EntryEventImpl.create(lr, Operation.REMOVEALL_DESTROY, null, null,
+        "removeAllCallback", false, lr.getMyId());
+    event.disallowOffHeapValues();
+    DistributedRemoveAllOperation removeAllOp =
+        new DistributedRemoveAllOperation(event, keys.size(), false);
+    VersionedObjectList versions = lr.basicRemoveAll((Collection) keys, removeAllOp, null);
+    return versions;
   }
 
   public static void waitTillNotify(Object lock_object, int waitTime, boolean ready) {