You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2020/01/22 19:10:31 UTC
[geode] branch feature/GEODE-7663 updated: GEODE-7663: Fix delta
update inconsistency in client cache.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-7663
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7663 by this push:
new 0f60172 GEODE-7663: Fix delta update inconsistency in client cache.
0f60172 is described below
commit 0f60172ddcb427af471a062a560631e9110b6069
Author: Eric Shu <es...@EricMacBookPro.local>
AuthorDate: Wed Jan 22 11:02:38 2020 -0800
GEODE-7663: Fix delta update inconsistency in client cache.
* When performing delta update on client, there is a possibility that
some delta update is not yet applied on client cache.
* If apply delta update, some queued delta update would be lost -- as
entry versioning will discard these delta updates when received later.
* Now it will check if there is a versioning mismatch, if it happened,
the full value from server will be applied instead of just the delta.
---
.../apache/geode/cache/client/internal/PutOp.java | 31 ++++-
.../geode/internal/cache/EntryEventImpl.java | 2 +-
.../cache/client/internal/PutOpJUnitTest.java | 79 ++++++++++++-
.../tier/sockets/ClientToServerDeltaDUnitTest.java | 126 +++++++++++++++++++++
4 files changed, 232 insertions(+), 6 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
index 798e1a7..baf59f4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
@@ -30,9 +30,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -266,7 +268,6 @@ public class PutOp {
@Override
protected Object processResponse(Message msg, Connection con) throws Exception {
processAck(msg, con);
-
if (prSingleHopEnabled) {
Part part = msg.getPart(0);
byte[] bytesReceived = part.getSerializedForm();
@@ -299,13 +300,39 @@ public class PutOp {
VersionTag tag = (VersionTag) msg.getPart(partIdx).getObject();
// we use the client's ID since we apparently don't track the server's ID in connections
tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
- event.setVersionTag(tag);
+ checkForDeltaConflictAndSetVersionTag(tag, con);
}
return oldValue;
}
return null;
}
+ void checkForDeltaConflictAndSetVersionTag(VersionTag versionTag, Connection connection)
+ throws Exception {
+ RegionEntry regionEntry = ((EntryEventImpl) event).getRegionEntry();
+ if (regionEntry == null) {
+ event.setVersionTag(versionTag);
+ return;
+ }
+ VersionStamp versionStamp = regionEntry.getVersionStamp();
+ if (deltaSent && versionTag.getEntryVersion() != versionStamp.getEntryVersion() + 1) {
+ // Delta can't be applied, need to get full value.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Version is out of order. Need to get from server to perform delta update.");
+ }
+ Object object = getFullValue(connection);
+ event.setNewValue(object);
+ } else {
+ event.setVersionTag(versionTag);
+ }
+ }
+
+ Object getFullValue(Connection connection) throws Exception {
+ GetOp.GetOpImpl getOp =
+ new GetOp.GetOpImpl(region, key, callbackArg, prSingleHopEnabled, event);
+ return getOp.attempt(connection);
+ }
+
/**
* Process a response that contains an ack.
*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 40077e4..4168541 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -1675,7 +1675,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
this.re = re;
}
- RegionEntry getRegionEntry() {
+ public RegionEntry getRegionEntry() {
return this.re;
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
index 1509a57..92c28fd 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PutOpJUnitTest.java
@@ -16,19 +16,38 @@ package org.apache.geode.cache.client.internal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.RegionEntry;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
public class PutOpJUnitTest {
+ private final EntryEventImpl event = mock(EntryEventImpl.class);
+ private final VersionTag versionTag = mock(VersionTag.class);
+ private final RegionEntry entry = mock(RegionEntry.class);
+ private final VersionStamp versionStamp = mock(VersionStamp.class);
+ private final Connection connection = mock(Connection.class);
+
+ @Before
+ public void setup() {
+ when(event.getEventId()).thenReturn(new EventID());
+ when(entry.getVersionStamp()).thenReturn(versionStamp);
+ }
private EntryEventImpl getEntryEvent() {
- EntryEventImpl entryEvent = Mockito.mock(EntryEventImpl.class);
- Mockito.when(entryEvent.getEventId()).thenReturn(new EventID());
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ when(entryEvent.getEventId()).thenReturn(new EventID());
return entryEvent;
}
@@ -57,4 +76,58 @@ public class PutOpJUnitTest {
assertTrue(putOp.getMessage().isRetry());
}
+ @Test
+ public void putOpSetVersionTagIfRegionEntryInEntryEventIsNull() throws Exception {
+ PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+ event, Operation.UPDATE,
+ false, false, null, false, false);
+ when(event.getRegionEntry()).thenReturn(null);
+
+ putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+ verify(event).setVersionTag(versionTag);
+ }
+
+ @Test
+ public void putOpSetVersionTagIfNotADeltaUpdate() throws Exception {
+ PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+ event, Operation.UPDATE,
+ false, false, null, true, false);
+ when(event.getRegionEntry()).thenReturn(entry);
+
+ putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+ verify(event).setVersionTag(versionTag);
+ }
+
+ @Test
+ public void putOpSetVersionTagIfDeltaUpdateVersionInOrder() throws Exception {
+ PutOp.PutOpImpl putOp = new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+ event, Operation.UPDATE,
+ false, false, null, false, false);
+ when(event.getRegionEntry()).thenReturn(entry);
+ when(versionTag.getEntryVersion()).thenReturn(2);
+ when(versionStamp.getEntryVersion()).thenReturn(1);
+
+ putOp.checkForDeltaConflictAndSetVersionTag(versionTag, null);
+
+ verify(event).setVersionTag(versionTag);
+ }
+
+ @Test
+ public void putOpGetFullValueIfDeltaUpdateVersionOutOfOrder() throws Exception {
+ Object object = new Object();
+ PutOp.PutOpImpl putOp =
+ spy(new PutOp.PutOpImpl("testRegion", "testKey", "testValue", new byte[10],
+ event, Operation.UPDATE,
+ false, false, null, false, false));
+ when(event.getRegionEntry()).thenReturn(entry);
+ when(versionTag.getEntryVersion()).thenReturn(3);
+ when(versionStamp.getEntryVersion()).thenReturn(1);
+ doReturn(object).when(putOp).getFullValue(connection);
+
+ putOp.checkForDeltaConflictAndSetVersionTag(versionTag, connection);
+
+ verify(event).setNewValue(object);
+ }
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
index a03d333..ec79518 100755
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientToServerDeltaDUnitTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -26,6 +27,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -62,6 +64,7 @@ import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.SerializationTest;
@@ -122,6 +125,11 @@ public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase {
public static String LAST_KEY = "LAST_KEY";
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
@@ -283,6 +291,124 @@ public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase {
assertFalse("validation fails", err);
}
+ @Test
+ public void testClientDeltaPropogationPutFetchesTheLatestValueWhenClientVersionIsOlder()
+ throws Exception {
+ // client did not register interest
+ Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest
+ .createServerCache(Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.TRUE))).intValue();
+
+ ClientToServerDeltaDUnitTest.createClientCache(
+ NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
+ Boolean.FALSE, Boolean.FALSE, null, Boolean.FALSE);
+ Region r = cache.getRegion(REGION_NAME);
+ DeltaTestImpl val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+ new TestObjectWithIdentifier("0", 0));
+ r.put(KEY1, val);
+
+ server.invoke(() -> {
+ Region region = cache.getRegion(REGION_NAME);
+ DeltaTestImpl val1 = (DeltaTestImpl) region.get(KEY1);
+ val1.NEED_TO_RESET_T0_DELTA = false;
+ val1.setIntVar(1);
+ region.put(KEY1, val1);
+ });
+
+ DeltaTestImpl val2 = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+ new TestObjectWithIdentifier("0", 0));
+ val2.setStr("1");
+ val2.NEED_TO_RESET_T0_DELTA = false;
+ r.put(KEY1, val2);
+
+ server.invoke(() -> {
+ Region region = cache.getRegion(REGION_NAME);
+ assertThat((DeltaTestImpl) region.get(KEY1)).isNotNull();
+ });
+
+ DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
+ new TestObjectWithIdentifier("0", 0));
+
+ server.invoke(() -> {
+ Region region = cache.getRegion(REGION_NAME);
+ GeodeAwaitility.await()
+ .untilAsserted(() -> assertThat((DeltaTestImpl) region.get(KEY1)).isEqualTo(expected));
+ });
+
+ GeodeAwaitility.await()
+ .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+ }
+
+ @Test
+ public void clientDeltaPutFetchesTheLatestVersionIfNotYetReceivedQueuedEvent() {
+ server.invoke(() -> setSlowStartForTesting());
+ server2.invoke(() -> setSlowStartForTesting());
+ initialise(false);
+
+ DeltaTestImpl original = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
+ new TestObjectWithIdentifier("0", 0));
+ client.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ original.NEED_TO_RESET_T0_DELTA = false;
+ r.put(KEY1, original);
+ });
+
+ client2.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ assertThat(r.get(KEY1)).isEqualTo(original);
+ });
+
+ client.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
+ assertThat(val).isEqualTo(original);
+ val.NEED_TO_RESET_T0_DELTA = false;
+ val.setIntVar(1);
+ r.put(KEY1, val);
+ });
+
+ client2.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
+ // delta update should not arrive yet due to slow dispatcher
+ assertThat(val).isEqualTo(original);
+ val.NEED_TO_RESET_T0_DELTA = false;
+ val.setStr("1");
+ r.put(KEY1, val);
+ Object o = r.get(KEY1);
+ logger.info("object is " + o);
+ });
+
+ server.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ r.get(KEY1);
+ });
+
+ server2.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ r.get(KEY1);
+ });
+
+ DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
+ new TestObjectWithIdentifier("0", 0));
+
+ client.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ GeodeAwaitility.await()
+ .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+ });
+
+ client2.invoke(() -> {
+ Region r = cache.getRegion(REGION_NAME);
+ GeodeAwaitility.await()
+ .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
+ });
+ }
+
+ private void setSlowStartForTesting() {
+ CacheClientProxy.isSlowStartForTesting = true;
+ System.setProperty("slowStartTimeForTesting", "5000");
+ }
+
private static void putDeltaForCQ(String key, Integer numOfPuts, Integer[] cqIndices,
Boolean[] satisfyQuery) {
Region region = cache.getRegion(REGION_NAME);