You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2020/01/22 19:10:31 UTC

[geode] branch feature/GEODE-7663 updated: GEODE-7663: Fix delta update inconsistency in client cache.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-7663
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-7663 by this push:
     new 0f60172  GEODE-7663: Fix delta update inconsistency in client cache.
0f60172 is described below

commit 0f60172ddcb427af471a062a560631e9110b6069
Author: Eric Shu <es...@EricMacBookPro.local>
AuthorDate: Wed Jan 22 11:02:38 2020 -0800

    GEODE-7663: Fix delta update inconsistency in client cache.
    
     * When performing delta update on client, there is a possibility that
       some delta update is not yet applied on client cache.
     * If apply delta update, some queued delta update would be lost -- as
       entry versioning will discard these delta updates when received later.
     * Now it will check if there is a versioning mismatch, if it happened,
       the full value from server will be applied instead of just the delta.
---
 .../apache/geode/cache/client/internal/PutOp.java  |  31 ++++-
 .../geode/internal/cache/EntryEventImpl.java       |   2 +-
 .../cache/client/internal/PutOpJUnitTest.java      |  79 ++++++++++++-
 .../tier/sockets/ClientToServerDeltaDUnitTest.java | 126 +++++++++++++++++++++
 4 files changed, 232 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
index 798e1a7..baf59f4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
@@ -30,9 +30,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -266,7 +268,6 @@ public class PutOp {
     @Override
     protected Object processResponse(Message msg, Connection con) throws Exception {
       processAck(msg, con);
-
       if (prSingleHopEnabled) {
         Part part = msg.getPart(0);
         byte[] bytesReceived = part.getSerializedForm();
@@ -299,13 +300,39 @@ public class PutOp {
           VersionTag tag = (VersionTag) msg.getPart(partIdx).getObject();
           // we use the client's ID since we apparently don't track the server's ID in connections
           tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
-          event.setVersionTag(tag);
+          checkForDeltaConflictAndSetVersionTag(tag, con);
         }
         return oldValue;
       }
       return null;
     }
 
+    void checkForDeltaConflictAndSetVersionTag(VersionTag versionTag, Connection connection)
+        throws Exception {
+      RegionEntry regionEntry = ((EntryEventImpl) event).getRegionEntry();
+      if (regionEntry == null) {
+        event.setVersionTag(versionTag);
+        return;
+      }
+      VersionStamp versionStamp = regionEntry.getVersionStamp();
+      if (deltaSent && versionTag.getEntryVersion() != versionStamp.getEntryVersion() + 1) {
+        // Delta can't be applied, need to get full value.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Version is out of order. Need to get from server to perform delta update.");
+        }
+        Object object = getFullValue(connection);
+        event.setNewValue(object);
+      } else {
+        event.setVersionTag(versionTag);
+      }
+    }
+
+    Object getFullValue(Connection connection) throws Exception {
+      GetOp.GetOpImpl getOp =
+          new GetOp.GetOpImpl(region, key, callbackArg, prSingleHopEnabled, event);
+      return getOp.attempt(connection);
+    }
+
     /**
      * Process a response that contains an ack.
      *
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 40077e4..4168541 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -1675,7 +1675,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     this.re = re;
   }
 
-  RegionEntry getRegionEntry() {
+  public RegionEntry getRegionEntry() {
     return this.re;
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
index 1509a57..92c28fd 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
@@ -16,19 +16,38 @@ package org.apache.geode.cache.client.internal;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
 
 public class PutOpJUnitTest {
+  private final EntryEventImpl event = mock(EntryEventImpl.class);
+  private final VersionTag versionTag = mock(VersionTag.class);
+  private final RegionEntry entry = mock(RegionEntry.class);
+  private final VersionStamp versionStamp = mock(VersionStamp.class);
+  private final Connection connection = mock(Connection.class);
+
+  @Before
+  public void setup() {
+    when(event.getEventId()).thenReturn(new EventID());
+    when(entry.getVersionStamp()).thenReturn(versionStamp);
+  }
 
   private EntryEventImpl getEntryEvent() {
-    EntryEventImpl entryEvent = Mockito.mock(EntryEventImpl.class);
-    Mockito.when(entryEvent.getEventId()).thenReturn(new EventID());
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    when(entryEvent.getEventId()).thenReturn(new EventID());
     return entryEvent;
   }
 
@@ -57,4 +76,58 @@ public class PutOpJUnitTest {
     assertTrue(putOp.getMessage().isRetry());
   }
 
+  @Test
+  public void putOpSetVersionTagIfRegionEntryInEntryEventIsNull() throws Exception {
+    PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+        event, Operation.UPDATE,
+        false, false, null, false, false);
+    when(event.getRegionEntry()).thenReturn(null);
+
+    putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+    verify(event).setVersionTag(versionTag);
+  }
+
+  @Test
+  public void putOpSetVersionTagIfNotADeltaUpdate() throws Exception {
+    PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+        event, Operation.UPDATE,
+        false, false, null, true, false);
+    when(event.getRegionEntry()).thenReturn(entry);
+
+    putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+    verify(event).setVersionTag(versionTag);
+  }
+
+  @Test
+  public void putOpSetVersionTagIfDeltaUpdateVersionInOrder() throws Exception {
+    PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+        event, Operation.UPDATE,
+        false, false, null, false, false);
+    when(event.getRegionEntry()).thenReturn(entry);
+    when(versionTag.getEntryVersion()).thenReturn(2);
+    when(versionStamp.getEntryVersion()).thenReturn(1);
+
+    putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+    verify(event).setVersionTag(versionTag);
+  }
+
+  @Test
+  public void putOpGetFullValueIfDeltaUpdateVersionOutOfOrder() throws Exception {
+    Object object = new Object();
+    PutOp.PutOpImpl putOp =
+        spy(new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+            event, Operation.UPDATE,
+            false, false, null, false, false));
+    when(event.getRegionEntry()).thenReturn(entry);
+    when(versionTag.getEntryVersion()).thenReturn(3);
+    when(versionStamp.getEntryVersion()).thenReturn(1);
+    doReturn(object).when(putOp).getFullValue(connection);
+
+    putOp.checkForDeltaConflictAndSetVersionTag(versionTag, connection);
+
+    verify(event).setNewValue(object);
+  }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
index a03d333..ec79518 100755
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -26,6 +27,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -62,6 +64,7 @@ import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.SerializationTest;
 
@@ -122,6 +125,11 @@ public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase {
 
   public static String LAST_KEY = "LAST_KEY";
 
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
   @Override
   public final void postSetUp() throws Exception {
     disconnectAllFromDS();
@@ -283,6 +291,124 @@ public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase {
     assertFalse("validation fails", err);
   }
 
+  @Test
+  public void testClientDeltaPropogationPutFetchesTheLatestValueWhenClientVersionIsOlder()
+      throws Exception {
+    // client did not register interest
+    Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest
+        .createServerCache(Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.TRUE))).intValue();
+
+    ClientToServerDeltaDUnitTest.createClientCache(
+        NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
+        Boolean.FALSE, Boolean.FALSE, null, Boolean.FALSE);
+    Region r = cache.getRegion(REGION_NAME);
+    DeltaTestImpl val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+        new TestObjectWithIdentifier("0", 0));
+    r.put(KEY1, val);
+
+    server.invoke(() -> {
+      Region region = cache.getRegion(REGION_NAME);
+      DeltaTestImpl val1 = (DeltaTestImpl) region.get(KEY1);
+      val1.NEED_TO_RESET_T0_DELTA = false;
+      val1.setIntVar(1);
+      region.put(KEY1, val1);
+    });
+
+    DeltaTestImpl val2 = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+        new TestObjectWithIdentifier("0", 0));
+    val2.setStr("1");
+    val2.NEED_TO_RESET_T0_DELTA = false;
+    r.put(KEY1, val2);
+
+    server.invoke(() -> {
+      Region region = cache.getRegion(REGION_NAME);
+      assertThat((DeltaTestImpl) region.get(KEY1)).isNotNull();
+    });
+
+    DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
+        new TestObjectWithIdentifier("0", 0));
+
+    server.invoke(() -> {
+      Region region = cache.getRegion(REGION_NAME);
+      GeodeAwaitility.await()
+          .untilAsserted(() -> assertThat((DeltaTestImpl) region.get(KEY1)).isEqualTo(expected));
+    });
+
+    GeodeAwaitility.await()
+        .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+  }
+
+  @Test
+  public void clientDeltaPutFetchesTheLatestVersionIfNotYetReceivedQueuedEvent() {
+    server.invoke(() -> setSlowStartForTesting());
+    server2.invoke(() -> setSlowStartForTesting());
+    initialise(false);
+
+    DeltaTestImpl original = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+        new TestObjectWithIdentifier("0", 0));
+    client.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      original.NEED_TO_RESET_T0_DELTA = false;
+      r.put(KEY1, original);
+    });
+
+    client2.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      assertThat(r.get(KEY1)).isEqualTo(original);
+    });
+
+    client.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
+      assertThat(val).isEqualTo(original);
+      val.NEED_TO_RESET_T0_DELTA = false;
+      val.setIntVar(1);
+      r.put(KEY1, val);
+    });
+
+    client2.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
+      // delta update should not arrive yet due to slow dispatcher
+      assertThat(val).isEqualTo(original);
+      val.NEED_TO_RESET_T0_DELTA = false;
+      val.setStr("1");
+      r.put(KEY1, val);
+      Object o = r.get(KEY1);
+      logger.info("object is " + o);
+    });
+
+    server.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      r.get(KEY1);
+    });
+
+    server2.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      r.get(KEY1);
+    });
+
+    DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
+        new TestObjectWithIdentifier("0", 0));
+
+    client.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      GeodeAwaitility.await()
+          .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+    });
+
+    client2.invoke(() -> {
+      Region r = cache.getRegion(REGION_NAME);
+      GeodeAwaitility.await()
+          .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+    });
+  }
+
+  private void setSlowStartForTesting() {
+    CacheClientProxy.isSlowStartForTesting = true;
+    System.setProperty("slowStartTimeForTesting", "5000");
+  }
+
   private static void putDeltaForCQ(String key, Integer numOfPuts, Integer[] cqIndices,
       Boolean[] satisfyQuery) {
     Region region = cache.getRegion(REGION_NAME);