You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/13 20:14:57 UTC

[beam] branch master updated: Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aa24dc3758 Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF
2aa24dc3758 is described below

commit 2aa24dc3758750bd76c024dc787c06b91fd725ed
Author: MiguelAnzoWizeline <69...@users.noreply.github.com>
AuthorDate: Wed Apr 13 15:14:51 2022 -0500

    Merge pull request #15549 from [BEAM-11997] Changed RedisIO implementation to SDF
    
    * Changed RedisIO implementation to SDF
    
    * Remove commented code
    
    * Fixed style
    
    * Updated RedisIO split logic
    
    * Updated RedisIO SDF implementation
    
    * Changed RedisCursor, range and tracker implementation to use an internal ByteKey along with the String cursor
    
    * Changed RedisCursor utility functions to static
    
    * Supress nullness warning
    
    * Fixing comparasion values to avoid warnings
    
    * Fix spotbugs issues
    
    * Fix spotbugs issues
    
    * Fixed check style
    
    * Added diferent value for Redis start and end cursors, simplified RedisIO read logic, added unit tests
    
    * Moved util functions from RedisCursor to RedisCursorRangeTracker, updated RedisCursor compareTo logic
    
    * Removed commented code
    
    * Fixed style warning
    
    * Changed tracker implementation to ByteKey in RedisIO
    
    * Changed deprecated Reshuffle to Distinct in RedisIO
    
    * Added comments to document redis cursor functions and scan result guarantees
    
    * Using Latest.perKey instead of Distinct.create
    
    * [BEAM-11997] Fix unusedVariable warning
    
    * [BEAM-11997] Fix SpotlessApply
    
    * [BEAM-11997] Add case in Latest CombineFn when input and accumulator have the same timestamp
    
    * [BEAM-19997] Fix checkstyle error
    
    Co-authored-by: Fernando Morales <fe...@wizeline.com>
    Co-authored-by: Benjamin Gonzalez <be...@wizeline.com>
---
 .../org/apache/beam/sdk/transforms/Latest.java     |   4 +-
 .../org/apache/beam/sdk/io/redis/RedisCursor.java  | 161 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 121 ++++------------
 .../org/apache/beam/sdk/io/redis/RedisIOTest.java  |  69 +++++++++
 4 files changed, 263 insertions(+), 92 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index ee4017d45c5..892286b60c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -109,8 +109,10 @@ public class Latest {
 
       if (input.getTimestamp().isBefore(accumulator.getTimestamp())) {
         return accumulator;
-      } else {
+      } else if (input.getTimestamp().isAfter(accumulator.getTimestamp())) {
         return input;
+      } else {
+        return accumulator.getValue() != null ? accumulator : input;
       }
     }
 
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java
new file mode 100644
index 00000000000..be93cae3b98
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class RedisCursor implements Comparable<RedisCursor>, Serializable {
+
+  private final String cursor;
+  private final long dbSize;
+  private final boolean isStart;
+  private static BigEndianLongCoder coder = BigEndianLongCoder.of();
+
+  public static final ByteKey ZERO_KEY = ByteKey.of(0x00);
+  // The zero cursor that represents the start position is represented as ByteKey.of(0x00)
+  public static final RedisCursor ZERO_CURSOR = RedisCursor.of("0", 8, true);
+  // The zero cursor that represents the start position is represented as ByteKey.EMPTY
+  public static final RedisCursor END_CURSOR = RedisCursor.of("0", 8, false);
+
+  public static RedisCursor of(String cursor, long dbSize, boolean isStart) {
+    return new RedisCursor(cursor, dbSize, isStart);
+  }
+
+  private RedisCursor(String cursor, long dbSize, boolean isStart) {
+    this.cursor = cursor;
+    this.dbSize = dbSize;
+    this.isStart = isStart;
+  }
+
+  /**
+   * {@link RedisCursor} implements {@link Comparable Comparable&lt;RedisCursor&gt;} by transforming
+   * the cursors to an index of the Redis table.
+   */
+  @Override
+  public int compareTo(@Nonnull RedisCursor other) {
+    checkNotNull(other, "other");
+    if ("0".equals(cursor) && "0".equals(other.cursor)) {
+      if (isStart && !other.isStart()) {
+        return -1;
+      } else if (!isStart && other.isStart()) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    return Long.compare(Long.parseLong(cursor), Long.parseLong(other.cursor));
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RedisCursor that = (RedisCursor) o;
+    return dbSize == that.dbSize && isStart == that.isStart && Objects.equals(cursor, that.cursor);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(cursor, dbSize, isStart);
+  }
+
+  public String getCursor() {
+    return cursor;
+  }
+
+  public long getDbSize() {
+    return dbSize;
+  }
+
+  public boolean isStart() {
+    return isStart;
+  }
+
+  /*
+   In order to write the functions that converts to and from a Redis cursor and a ByteKey the following article and
+   its sources were used as reference as they explain in detail how a Redis cursor is formed and how to transform it to
+   a sequential value that represents the current position of the scan cursor in Redis tables.
+   https://engineering.q42.nl/redis-scan-cursor/
+  */
+  @VisibleForTesting
+  static ByteKey redisCursorToByteKey(RedisCursor cursor) {
+    if ("0".equals(cursor.getCursor())) {
+      if (cursor.isStart()) {
+        return ByteKey.of(0x00);
+      } else {
+        return ByteKey.EMPTY;
+      }
+    }
+    int nBits = getTablePow(cursor.getDbSize());
+    long cursorLong = Long.parseLong(cursor.getCursor());
+    long reversed = shiftBits(cursorLong, nBits);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    try {
+      coder.encode(reversed, os);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("invalid redis cursor " + cursor);
+    }
+    byte[] byteArray = os.toByteArray();
+    return ByteKey.copyFrom(byteArray);
+  }
+
+  @VisibleForTesting
+  static long shiftBits(long a, int nBits) {
+    long b = 0;
+    for (int i = 0; i < nBits; ++i) {
+      b <<= 1;
+      b |= (a & 1);
+      a >>= 1;
+    }
+    return b;
+  }
+
+  @VisibleForTesting
+  static int getTablePow(long nKeys) {
+    return 64 - Long.numberOfLeadingZeros(nKeys - 1);
+  }
+
+  @VisibleForTesting
+  static RedisCursor byteKeyToRedisCursor(ByteKey byteKeyStart, long nKeys, boolean isStart) {
+    if (byteKeyStart.isEmpty()) {
+      return RedisCursor.of("0", nKeys, false);
+    } else if (byteKeyStart.equals(ByteKey.of(0x00))) {
+      return RedisCursor.of("0", nKeys, true);
+    }
+    int nBits = getTablePow(nKeys);
+    ByteBuffer bb = ByteBuffer.wrap(byteKeyStart.getBytes());
+    long l = bb.getLong();
+    l = shiftBits(l, nBits);
+    return RedisCursor.of(Long.toString(l), nKeys, isStart);
+  }
+}
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 3f89e560f07..3dc6a7af6cd 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -22,28 +22,27 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import com.google.auto.value.AutoValue;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Latest;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.StreamEntryID;
@@ -278,7 +277,6 @@ public class RedisIO {
 
       return input
           .apply(Create.of(keyPattern()))
-          .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration())))
           .apply(
               RedisIO.readKeyPatterns()
                   .withConnectionConfiguration(connectionConfiguration())
@@ -357,7 +355,7 @@ public class RedisIO {
       checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
       PCollection<KV<String, String>> output =
           input
-              .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
+              .apply(ParDo.of(new ReadFn(connectionConfiguration())))
               .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
       if (outputParallelization()) {
         output = output.apply(new Reparallelize());
@@ -366,12 +364,13 @@ public class RedisIO {
     }
   }
 
-  private abstract static class BaseReadFn<T> extends DoFn<String, T> {
-    protected final RedisConnectionConfiguration connectionConfiguration;
+  @DoFn.BoundedPerElement
+  private static class ReadFn extends DoFn<String, KV<String, String>> {
 
+    protected final RedisConnectionConfiguration connectionConfiguration;
     transient Jedis jedis;
 
-    BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
+    ReadFn(RedisConnectionConfiguration connectionConfiguration) {
       this.connectionConfiguration = connectionConfiguration;
     }
 
@@ -384,95 +383,33 @@ public class RedisIO {
     public void teardown() {
       jedis.close();
     }
-  }
-
-  private static class ReadKeysWithPattern extends BaseReadFn<String> {
 
-    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
-      super(connectionConfiguration);
+    @GetInitialRestriction
+    public ByteKeyRange getInitialRestriction() {
+      return ByteKeyRange.of(ByteKey.of(0x00), ByteKey.EMPTY);
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
+    public void processElement(
+        ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) {
+      ByteKey cursor = tracker.currentRestriction().getStartKey();
+      RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(cursor, jedis.dbSize(), true);
       ScanParams scanParams = new ScanParams();
       scanParams.match(c.element());
-
-      String cursor = ScanParams.SCAN_POINTER_START;
-      boolean finished = false;
-      while (!finished) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
-        List<String> keys = scanResult.getResult();
-        for (String k : keys) {
-          c.output(k);
-        }
-        cursor = scanResult.getCursor();
-        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
-          finished = true;
-        }
-      }
-    }
-  }
-
-  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
-  private static class ReadFn extends BaseReadFn<KV<String, String>> {
-    transient @Nullable Multimap<BoundedWindow, String> bundles = null;
-    @Nullable AtomicInteger batchCount = null;
-    private final int batchSize;
-
-    ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) {
-      super(connectionConfiguration);
-      this.batchSize = batchSize;
-    }
-
-    @StartBundle
-    public void startBundle() {
-      bundles = ArrayListMultimap.create();
-      batchCount = new AtomicInteger();
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      String key = c.element();
-      bundles.put(window, key);
-      if (batchCount.incrementAndGet() > getBatchSize()) {
-        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-        for (BoundedWindow w : kvs.keySet()) {
-          for (KV<String, String> kv : kvs.get(w)) {
-            c.output(kv);
-          }
-        }
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext context) {
-      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-      for (BoundedWindow w : kvs.keySet()) {
-        for (KV<String, String> kv : kvs.get(w)) {
-          context.output(kv, w.maxTimestamp(), w);
-        }
-      }
-    }
-
-    private int getBatchSize() {
-      return batchSize;
-    }
-
-    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
-      Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
-      for (BoundedWindow w : bundles.keySet()) {
-        String[] keys = new String[bundles.get(w).size()];
-        bundles.get(w).toArray(keys);
-        List<String> results = jedis.mget(keys);
-        for (int i = 0; i < results.size(); i++) {
-          if (results.get(i) != null) {
-            kvs.put(w, KV.of(keys[i], results.get(i)));
+      while (tracker.tryClaim(cursor)) {
+        ScanResult<String> scanResult = jedis.scan(redisCursor.getCursor(), scanParams);
+        if (scanResult.getResult().size() > 0) {
+          String[] keys = scanResult.getResult().toArray(new String[scanResult.getResult().size()]);
+          List<String> results = jedis.mget(keys);
+          for (int i = 0; i < results.size(); i++) {
+            if (results.get(i) != null) {
+              c.output(KV.of(keys[i], results.get(i)));
+            }
           }
         }
+        redisCursor = RedisCursor.of(scanResult.getCursor(), jedis.dbSize(), false);
+        cursor = RedisCursor.redisCursorToByteKey(redisCursor);
       }
-      bundles = ArrayListMultimap.create();
-      batchCount.set(0);
-      return kvs;
     }
   }
 
@@ -497,7 +434,9 @@ public class RedisIO {
                         }
                       })
                   .withSideInputs(empty));
-      return materialized.apply(Reshuffle.viaRandomKey());
+      /* Redis Scan may return a given element multiple times, so we use the Latest.perKey() transform to remove duplicates,
+      see "Scan guarantees" in https://redis.io/commands/scan */
+      return materialized.apply(Latest.perKey());
     }
   }
 
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index b3d308e02fa..10be1741cab 100644
--- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.common.NetworkTestHelper;
+import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -100,6 +101,38 @@ public class RedisIOTest {
     p.run();
   }
 
+  @Test
+  public void testReadSplitBig() {
+    List<KV<String, String>> data = buildIncrementalData("bigset", 1000);
+    data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
+    PCollection<KV<String, String>> read =
+        p.apply(
+            "Read",
+            RedisIO.read()
+                .withEndpoint(REDIS_HOST, port)
+                .withKeyPattern("bigset*")
+                .withBatchSize(8));
+    PAssert.that(read).containsInAnyOrder(data);
+    p.run();
+  }
+
+  @Test
+  public void testReadSplitSmall() {
+    List<KV<String, String>> data = buildIncrementalData("smallset", 5);
+    data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
+    PCollection<KV<String, String>> read =
+        p.apply(
+            "Read",
+            RedisIO.read()
+                .withEndpoint(REDIS_HOST, port)
+                .withKeyPattern("smallset*")
+                .withBatchSize(20));
+    PAssert.that(read).containsInAnyOrder(data);
+    p.run();
+  }
+
   @Test
   public void testReadWithKeyPattern() {
     List<KV<String, String>> data = buildIncrementalData("pattern", 10);
@@ -331,6 +364,42 @@ public class RedisIOTest {
     }
   }
 
+  @Test
+  public void redisCursorToByteKey() {
+    RedisCursor redisCursor = RedisCursor.of("80", 200, true);
+    ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+    assertEquals(ByteKey.of(0, 0, 0, 0, 0, 0, 0, 10), byteKey);
+  }
+
+  @Test
+  public void redisCursorToByteKeyZeroStart() {
+    RedisCursor redisCursor = RedisCursor.of("0", 200, true);
+    ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+    assertEquals(RedisCursor.ZERO_KEY, byteKey);
+  }
+
+  @Test
+  public void redisCursorToByteKeyZeroEnd() {
+    RedisCursor redisCursor = RedisCursor.of("0", 200, false);
+    ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+    assertEquals(ByteKey.EMPTY, byteKey);
+  }
+
+  @Test
+  public void redisCursorToByteKeyAndBack() {
+    RedisCursor redisCursor = RedisCursor.of("80", 200, true);
+    ByteKey byteKey = RedisCursor.redisCursorToByteKey(redisCursor);
+    RedisCursor result = RedisCursor.byteKeyToRedisCursor(byteKey, 200, true);
+    assertEquals(redisCursor.getCursor(), result.getCursor());
+  }
+
+  @Test
+  public void redisByteKeyToRedisCursor() {
+    ByteKey bytes = ByteKey.of(0, 0, 0, 0, 0, 25, 68, 103);
+    RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(bytes, 1048586, true);
+    assertEquals("1885267", redisCursor.getCursor());
+  }
+
   private static List<KV<String, String>> buildConstantKeyList(String key, List<String> values) {
     List<KV<String, String>> data = new ArrayList<>();
     for (String value : values) {