You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/13 20:14:57 UTC
[beam] branch master updated: Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2aa24dc3758 Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF
2aa24dc3758 is described below
commit 2aa24dc3758750bd76c024dc787c06b91fd725ed
Author: MiguelAnzoWizeline <69...@users.noreply.github.com>
AuthorDate: Wed Apr 13 15:14:51 2022 -0500
Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF
* Changed RedisIO implementation to SDF
* Remove commented code
* Fixed style
* Updated RedisIO split logic
* Updated RedisIO SDF implementation
* Changed RedisCursor, range and tracker implementation to use an internal ByteKey along with the String cursor
* Changed RedisCursor utility functions to static
* Supress nullness warning
* Fixing comparasion values to avoid warnings
* Fix spotbugs issues
* Fix spotbugs issues
* Fixed check style
* Added diferent value for Redis start and end cursors, simplified RedisIO read logic, added unit tests
* Moved util functions from RedisCursor to RedisCursorRangeTracker, updated RedisCursor compareTo logic
* Removed commented code
* Fixed style warning
* Changed tracker implementation to ByteKey in RedisIO
* Changed deprecated Reshuffle to Distinct in RedisIO
* Added comments to document redis cursor functions and scan result guarantees
* Using Latest.perKey instead of Distinct.create
* [BEAM-11997] Fix unusedVariable warning
* [BEAM-11997] Fix SpotlessApply
* [BEAM-11997] Add case in Latest CombineFn when input and accumulator have the same timestamp
* [BEAM-19997] Fix checkstyle error
Co-authored-by: Fernando Morales <fe...@wizeline.com>
Co-authored-by: Benjamin Gonzalez <be...@wizeline.com>
---
.../org/apache/beam/sdk/transforms/Latest.java | 4 +-
.../org/apache/beam/sdk/io/redis/RedisCursor.java | 161 +++++++++++++++++++++
.../java/org/apache/beam/sdk/io/redis/RedisIO.java | 121 ++++------------
.../org/apache/beam/sdk/io/redis/RedisIOTest.java | 69 +++++++++
4 files changed, 263 insertions(+), 92 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index ee4017d45c5..892286b60c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -109,8 +109,10 @@ public class Latest {
if (input.getTimestamp().isBefore(accumulator.getTimestamp())) {
return accumulator;
- } else {
+ } else if (input.getTimestamp().isAfter(accumulator.getTimestamp())) {
return input;
+ } else {
+ return accumulator.getValue() != null ? accumulator : input;
}
}
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java
new file mode 100644
index 00000000000..be93cae3b98
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class RedisCursor implements Comparable<RedisCursor>, Serializable {
+
+ private final String cursor;
+ private final long dbSize;
+ private final boolean isStart;
+ private static BigEndianLongCoder coder = BigEndianLongCoder.of();
+
+ public static final ByteKey ZERO_KEY = ByteKey.of(0x00);
+ // The zero cursor that represents the start position is represented as ByteKey.of(0x00)
+ public static final RedisCursor ZERO_CURSOR = RedisCursor.of("0", 8, true);
+ // The zero cursor that represents the start position is represented as ByteKey.EMPTY
+ public static final RedisCursor END_CURSOR = RedisCursor.of("0", 8, false);
+
+ public static RedisCursor of(String cursor, long dbSize, boolean isStart) {
+ return new RedisCursor(cursor, dbSize, isStart);
+ }
+
+ private RedisCursor(String cursor, long dbSize, boolean isStart) {
+ this.cursor = cursor;
+ this.dbSize = dbSize;
+ this.isStart = isStart;
+ }
+
+ /**
+ * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transforming
+ * the cursors to an index of the Redis table.
+ */
+ @Override
+ public int compareTo(@Nonnull RedisCursor other) {
+ checkNotNull(other, "other");
+ if ("0".equals(cursor) && "0".equals(other.cursor)) {
+ if (isStart && !other.isStart()) {
+ return -1;
+ } else if (!isStart && other.isStart()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ return Long.compare(Long.parseLong(cursor), Long.parseLong(other.cursor));
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RedisCursor that = (RedisCursor) o;
+ return dbSize == that.dbSize && isStart == that.isStart && Objects.equals(cursor, that.cursor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cursor, dbSize, isStart);
+ }
+
+ public String getCursor() {
+ return cursor;
+ }
+
+ public long getDbSize() {
+ return dbSize;
+ }
+
+ public boolean isStart() {
+ return isStart;
+ }
+
+ /*
+ In order to write the functions that converts to and from a Redis cursor and a ByteKey the following article and
+ its sources were used as reference as they explain in detail how a Redis cursor is formed and how to transform it to
+ a sequential value that represents the current position of the scan cursor in Redis tables.
+ https://engineering.q42.nl/redis-scan-cursor/
+ */
+ @VisibleForTesting
+ static ByteKey redisCursorToByteKey(RedisCursor cursor) {
+ if ("0".equals(cursor.getCursor())) {
+ if (cursor.isStart()) {
+ return ByteKey.of(0x00);
+ } else {
+ return ByteKey.EMPTY;
+ }
+ }
+ int nBits = getTablePow(cursor.getDbSize());
+ long cursorLong = Long.parseLong(cursor.getCursor());
+ long reversed = shiftBits(cursorLong, nBits);
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ try {
+ coder.encode(reversed, os);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("invalid redis cursor " + cursor);
+ }
+ byte[] byteArray = os.toByteArray();
+ return ByteKey.copyFrom(byteArray);
+ }
+
+ @VisibleForTesting
+ static long shiftBits(long a, int nBits) {
+ long b = 0;
+ for (int i = 0; i < nBits; ++i) {
+ b <<= 1;
+ b |= (a & 1);
+ a >>= 1;
+ }
+ return b;
+ }
+
+ @VisibleForTesting
+ static int getTablePow(long nKeys) {
+ return 64 - Long.numberOfLeadingZeros(nKeys - 1);
+ }
+
+ @VisibleForTesting
+ static RedisCursor byteKeyToRedisCursor(ByteKey byteKeyStart, long nKeys, boolean isStart) {
+ if (byteKeyStart.isEmpty()) {
+ return RedisCursor.of("0", nKeys, false);
+ } else if (byteKeyStart.equals(ByteKey.of(0x00))) {
+ return RedisCursor.of("0", nKeys, true);
+ }
+ int nBits = getTablePow(nKeys);
+ ByteBuffer bb = ByteBuffer.wrap(byteKeyStart.getBytes());
+ long l = bb.getLong();
+ l = shiftBits(l, nBits);
+ return RedisCursor.of(Long.toString(l), nKeys, isStart);
+ }
+}
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 3f89e560f07..3dc6a7af6cd 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -22,28 +22,27 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
@@ -278,7 +277,6 @@ public class RedisIO {
return input
.apply(Create.of(keyPattern()))
- .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration())))
.apply(
RedisIO.readKeyPatterns()
.withConnectionConfiguration(connectionConfiguration())
@@ -357,7 +355,7 @@ public class RedisIO {
checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
PCollection<KV<String, String>> output =
input
- .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
+ .apply(ParDo.of(new ReadFn(connectionConfiguration())))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
if (outputParallelization()) {
output = output.apply(new Reparallelize());
@@ -366,12 +364,13 @@ public class RedisIO {
}
}
- private abstract static class BaseReadFn<T> extends DoFn<String, T> {
- protected final RedisConnectionConfiguration connectionConfiguration;
+ @DoFn.BoundedPerElement
+ private static class ReadFn extends DoFn<String, KV<String, String>> {
+ protected final RedisConnectionConfiguration connectionConfiguration;
transient Jedis jedis;
- BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
+ ReadFn(RedisConnectionConfiguration connectionConfiguration) {
this.connectionConfiguration = connectionConfiguration;
}
@@ -384,95 +383,33 @@ public class RedisIO {
public void teardown() {
jedis.close();
}
- }
-
- private static class ReadKeysWithPattern extends BaseReadFn<String> {
- ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
- super(connectionConfiguration);
+ @GetInitialRestriction
+ public ByteKeyRange getInitialRestriction() {
+ return ByteKeyRange.of(ByteKey.of(0x00), ByteKey.EMPTY);
}
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(
+ ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) {
+ ByteKey cursor = tracker.currentRestriction().getStartKey();
+ RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(cursor, jedis.dbSize(), true);
ScanParams scanParams = new ScanParams();
scanParams.match(c.element());
-
- String cursor = ScanParams.SCAN_POINTER_START;
- boolean finished = false;
- while (!finished) {
- ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
- List<String> keys = scanResult.getResult();
- for (String k : keys) {
- c.output(k);
- }
- cursor = scanResult.getCursor();
- if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
- finished = true;
- }
- }
- }
- }
-
- /** A {@link DoFn} requesting Redis server to get key/value pairs. */
- private static class ReadFn extends BaseReadFn<KV<String, String>> {
- transient @Nullable Multimap<BoundedWindow, String> bundles = null;
- @Nullable AtomicInteger batchCount = null;
- private final int batchSize;
-
- ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) {
- super(connectionConfiguration);
- this.batchSize = batchSize;
- }
-
- @StartBundle
- public void startBundle() {
- bundles = ArrayListMultimap.create();
- batchCount = new AtomicInteger();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- String key = c.element();
- bundles.put(window, key);
- if (batchCount.incrementAndGet() > getBatchSize()) {
- Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
- for (BoundedWindow w : kvs.keySet()) {
- for (KV<String, String> kv : kvs.get(w)) {
- c.output(kv);
- }
- }
- }
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context) {
- Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
- for (BoundedWindow w : kvs.keySet()) {
- for (KV<String, String> kv : kvs.get(w)) {
- context.output(kv, w.maxTimestamp(), w);
- }
- }
- }
-
- private int getBatchSize() {
- return batchSize;
- }
-
- private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
- Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
- for (BoundedWindow w : bundles.keySet()) {
- String[] keys = new String[bundles.get(w).size()];
- bundles.get(w).toArray(keys);
- List<String> results = jedis.mget(keys);
- for (int i = 0; i < results.size(); i++) {
- if (results.get(i) != null) {
- kvs.put(w, KV.of(keys[i], results.get(i)));
+ while (tracker.tryClaim(cursor)) {
+ ScanResult<String> scanResult = jedis.scan(redisCursor.getCursor(), scanParams);
+ if (scanResult.getResult().size() > 0) {
+ String[] keys = scanResult.getResult().toArray(new String[scanResult.getResult().size()]);
+ List<String> results = jedis.mget(keys);
+ for (int i = 0; i < results.size(); i++) {
+ if (results.get(i) != null) {
+ c.output(KV.of(keys[i], results.get(i)));
+ }
}
}
+ redisCursor = RedisCursor.of(scanResult.getCursor(), jedis.dbSize(), false);
+ cursor = RedisCursor.redisCursorToByteKey(redisCursor);
}
- bundles = ArrayListMultimap.create();
- batchCount.set(0);
- return kvs;
}
}
@@ -497,7 +434,9 @@ public class RedisIO {
}
})
.withSideInputs(empty));
- return materialized.apply(Reshuffle.viaRandomKey());
+ /* Redis Scan may return a given element multiple times, so we use the Latest.perKey() transform to remove duplicates,
+ see "Scan guarantees" in https://redis.io/commands/scan */
+ return materialized.apply(Latest.perKey());
}
}
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index b3d308e02fa..10be1741cab 100644
--- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
+import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -100,6 +101,38 @@ public class RedisIOTest {
p.run();
}
+ @Test
+ public void testReadSplitBig() {
+ List<KV<String, String>> data = buildIncrementalData("bigset", 1000);
+ data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
+ PCollection<KV<String, String>> read =
+ p.apply(
+ "Read",
+ RedisIO.read()
+ .withEndpoint(REDIS_HOST, port)
+ .withKeyPattern("bigset*")
+ .withBatchSize(8));
+ PAssert.that(read).containsInAnyOrder(data);
+ p.run();
+ }
+
+ @Test
+ public void testReadSplitSmall() {
+ List<KV<String, String>> data = buildIncrementalData("smallset", 5);
+ data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
+ PCollection<KV<String, String>> read =
+ p.apply(
+ "Read",
+ RedisIO.read()
+ .withEndpoint(REDIS_HOST, port)
+ .withKeyPattern("smallset*")
+ .withBatchSize(20));
+ PAssert.that(read).containsInAnyOrder(data);
+ p.run();
+ }
+
@Test
public void testReadWithKeyPattern() {
List<KV<String, String>> data = buildIncrementalData("pattern", 10);
@@ -331,6 +364,42 @@ public class RedisIOTest {
}
}
+ @Test
+ public void redisCursorToByteKey() {
+ RedisCursor redisCursor = RedisCursor.of("80", 200, true);
+ ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+ assertEquals(ByteKey.of(0, 0, 0, 0, 0, 0, 0, 10), byteKey);
+ }
+
+ @Test
+ public void redisCursorToByteKeyZeroStart() {
+ RedisCursor redisCursor = RedisCursor.of("0", 200, true);
+ ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+ assertEquals(RedisCursor.ZERO_KEY, byteKey);
+ }
+
+ @Test
+ public void redisCursorToByteKeyZeroEnd() {
+ RedisCursor redisCursor = RedisCursor.of("0", 200, false);
+ ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+ assertEquals(ByteKey.EMPTY, byteKey);
+ }
+
+ @Test
+ public void redisCursorToByteKeyAndBack() {
+ RedisCursor redisCursor = RedisCursor.of("80", 200, true);
+ ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+ RedisCursor result = RedisCursor.byteKeyToRedisCursor(byteKey, 200, true);
+ assertEquals(redisCursor.getCursor(), result.getCursor());
+ }
+
+ @Test
+ public void redisByteKeyToRedisCursor() {
+ ByteKey bytes = ByteKey.of(0, 0, 0, 0, 0, 25, 68, 103);
+ RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(bytes, 1048586, true);
+ assertEquals("1885267", redisCursor.getCursor());
+ }
+
private static List<KV<String, String>> buildConstantKeyList(String key, List<String> values) {
List<KV<String, String>> data = new ArrayList<>();
for (String value : values) {