You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/07/14 14:02:46 UTC
[geode] branch develop updated: GEODE-8351: DUnit tests for Delta
Propagation (#5364)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 1ebd307 GEODE-8351: DUnit tests for Delta Propagation (#5364)
1ebd307 is described below
commit 1ebd307ae1ae87c53f0ff6b8f0c83ab2feda93cd
Author: Sarah Abbey <41...@users.noreply.github.com>
AuthorDate: Tue Jul 14 10:02:00 2020 -0400
GEODE-8351: DUnit tests for Delta Propagation (#5364)
---
.../geode/redis/internal/data/DeltaDUnitTest.java | 315 +++++++++++++++++++++
.../redis/internal/data/ByteArrayWrapper.java | 6 +-
.../geode/redis/internal/data/NullRedisSet.java | 2 +
.../apache/geode/redis/internal/data/RedisSet.java | 2 +
4 files changed, 323 insertions(+), 2 deletions(-)
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/DeltaDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/DeltaDUnitTest.java
new file mode 100644
index 0000000..e07e71b
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/DeltaDUnitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.data;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+@SuppressWarnings("unchecked")
+public class DeltaDUnitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+ private static final String LOCAL_HOST = "127.0.0.1";
+ private static final int ITERATION_COUNT = 1000;
+ private static final int JEDIS_TIMEOUT =
+ Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+ private static Jedis jedis1;
+ private static Jedis jedis2;
+
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+
+ private static int redisServerPort1;
+ private static int redisServerPort2;
+
+ @BeforeClass
+ public static void classSetup() {
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+ server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+ server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+
+ redisServerPort1 = clusterStartUp.getRedisPort(1);
+ redisServerPort2 = clusterStartUp.getRedisPort(2);
+
+ jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+ jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+ }
+
+ @Before
+ public void testSetup() {
+ jedis1.flushAll();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ jedis1.disconnect();
+ jedis2.disconnect();
+
+ server1.stop();
+ server2.stop();
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAppending() {
+ String key = "key";
+ String baseValue = "value-";
+ jedis1.set(key, baseValue);
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ jedis1.append(key, String.valueOf(i));
+
+ byte[] server1LocalValue = server1.invoke(() -> (byte[]) getLocalData(key, r -> {
+ RedisData localValue = r.get(new ByteArrayWrapper(key.getBytes()));
+
+ try {
+ return BlobHelper.serializeToBlob(localValue);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ byte[] server2LocalValue = server2.invoke(() -> (byte[]) getLocalData(key, r -> {
+ RedisData localValue = r.get(new ByteArrayWrapper(key.getBytes()));
+
+ try {
+ return BlobHelper.serializeToBlob(localValue);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ assertThat(Arrays.equals(server1LocalValue, server2LocalValue));
+ }
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAddingToSet() {
+ String key = "key";
+
+ List<String> members = makeMemberList(ITERATION_COUNT, "member-");
+ for (String member : members) {
+ jedis1.sadd(key, member);
+ Set<ByteArrayWrapper> server1LocalSet =
+ server1.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.smembers();
+ }));
+
+ Set<ByteArrayWrapper> server2LocalSet =
+ server2.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.smembers();
+ }));
+
+ assertThat(server1LocalSet).containsExactlyInAnyOrder(server2LocalSet.toArray(
+ new ByteArrayWrapper[] {}));
+ }
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenRemovingFromSet() {
+ String key = "key";
+
+ List<String> members = makeMemberList(ITERATION_COUNT, "member-");
+ jedis1.sadd(key, members.toArray(new String[] {}));
+
+ for (String member : members) {
+ jedis1.srem(key, member);
+ Set<ByteArrayWrapper> server1LocalSet =
+ server1.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.smembers();
+ }));
+
+ Set<ByteArrayWrapper> server2LocalSet =
+ server2.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.smembers();
+ }));
+
+ if (server1LocalSet == null || server2LocalSet == null) {
+ assertThat(server1LocalSet).isEqualTo(server2LocalSet);
+ } else {
+ assertThat(server1LocalSet).containsExactlyInAnyOrder(server2LocalSet.toArray(
+ new ByteArrayWrapper[] {}));
+ }
+ }
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAddingToHash() {
+ String key = "key";
+
+ Map<String, String> testMap = makeHashMap(ITERATION_COUNT, "field-", "value-");
+
+ for (String field : testMap.keySet()) {
+ jedis1.hset(key, field, testMap.get(field));
+
+ Collection<ByteArrayWrapper> server1LocalHash =
+ server1.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
+ return localSet.hgetall();
+ }));
+
+ Collection<ByteArrayWrapper> server2LocalHash =
+ server2.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
+ return localSet.hgetall();
+ }));
+
+ assertThat(server1LocalHash).containsExactlyInAnyOrder(server2LocalHash.toArray(
+ new ByteArrayWrapper[] {}));
+ }
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenRemovingFromHash() {
+ String key = "key";
+
+ Map<String, String> testMap = makeHashMap(ITERATION_COUNT, "field-", "value-");
+ jedis1.hset(key, testMap);
+
+ for (String field : testMap.keySet()) {
+ jedis1.hdel(key, field, testMap.get(field));
+
+ Collection<ByteArrayWrapper> server1LocalHash =
+ server1.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.hgetall();
+ }));
+
+ Collection<ByteArrayWrapper> server2LocalHash =
+ server2.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
+ RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.hgetall();
+ }));
+
+ if (server1LocalHash == null || server2LocalHash == null) {
+ assertThat(server1LocalHash).isEqualTo(server2LocalHash);
+ } else {
+ assertThat(server1LocalHash).containsExactlyInAnyOrder(server2LocalHash.toArray(
+ new ByteArrayWrapper[] {}));
+ }
+ }
+ }
+
+ @Test
+ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenExpiring() {
+ String baseKey = "key-";
+
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ String key = baseKey + i;
+ jedis1.set(key, "value");
+ jedis1.expire(key, 20);
+ long server1LocalExpirtionTimestamp = server1.invoke(() -> (long) getLocalData(key, r -> {
+ RedisData localSet = r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.getExpirationTimestamp();
+ }));
+
+ long server2LocalExpirationTimestamp = server2.invoke(() -> (long) getLocalData(key, r -> {
+ RedisData localSet = r.get(new ByteArrayWrapper(key.getBytes()));
+ if (localSet == null) {
+ return null;
+ }
+ return localSet.getExpirationTimestamp();
+ }));
+
+ assertThat(server1LocalExpirtionTimestamp).isEqualTo(server2LocalExpirationTimestamp);
+ }
+ }
+
+ private static Object getLocalData(String key,
+ Function<Region<ByteArrayWrapper, RedisData>, Object> func) {
+ InternalCache cache = ClusterStartupRule.getCache();
+ Region<ByteArrayWrapper, RedisData> region = cache.getRegion("__REDIS_DATA");
+ Region<ByteArrayWrapper, RedisData> localRegion =
+ PartitionRegionHelper.getLocalData(region);
+
+ return func.apply(localRegion);
+ }
+
+ private Map<String, String> makeHashMap(int hashSize, String baseFieldName,
+ String baseValueName) {
+ Map<String, String> map = new HashMap<>();
+ for (int i = 0; i < hashSize; i++) {
+ map.put(baseFieldName + i, baseValueName + i);
+ }
+ return map;
+ }
+
+ private List<String> makeMemberList(int setSize, String baseString) {
+ List<String> members = new ArrayList<>();
+ for (int i = 0; i < setSize; i++) {
+ members.add(baseString + i);
+ }
+ return members;
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
index 5f7db4a..431053a 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import org.apache.geode.DataSerializer;
@@ -30,9 +31,10 @@ import org.apache.geode.redis.internal.netty.Coder;
/**
* This class is a wrapper for the any Regions that need to store a byte[]. The only data this an
* instance will store is a byte[] for the data but it is also serializable and comparable so it is
- * able to be used in querying
+ * able to be used in querying. Class is also marked as Serializable for test support.
*/
-public class ByteArrayWrapper implements DataSerializableFixedID, Comparable<ByteArrayWrapper> {
+public class ByteArrayWrapper
+ implements DataSerializableFixedID, Serializable, Comparable<ByteArrayWrapper> {
/**
* The data portion of ValueWrapper
*/
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
index fd7824e..498ebe4 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Region;
import org.apache.geode.redis.internal.executor.set.RedisSetCommands;
import org.apache.geode.redis.internal.executor.set.RedisSetCommandsFunctionInvoker;
@@ -81,6 +82,7 @@ class NullRedisSet extends RedisSet {
}
@Override
+ @VisibleForTesting
Set<ByteArrayWrapper> smembers() {
// some callers want to be able to modify the set returned
return new HashSet<>();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index 387f0c0..fb5ef52 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Region;
import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
import org.apache.geode.redis.internal.delta.DeltaInfo;
@@ -258,6 +259,7 @@ public class RedisSet extends AbstractRedisData {
*
* @return a set containing all the members in this set
*/
+ @VisibleForTesting
Set<ByteArrayWrapper> smembers() {
return new HashSet<>(members);
}