You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/14 21:10:46 UTC

[GitHub] [beam] fernando-wizeline commented on a change in pull request #15549: [BEAM-11997] Changed RedisIO implementation to SDF

fernando-wizeline commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r826382014



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,95 +337,33 @@ public void setup() {
     public void teardown() {
       jedis.close();
     }
-  }
-
-  private static class ReadKeysWithPattern extends BaseReadFn<String> {
 
-    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
-      super(connectionConfiguration);
+    @GetInitialRestriction
+    public ByteKeyRange getInitialRestriction() {
+      return ByteKeyRange.of(ByteKey.of(0x00), ByteKey.EMPTY);
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
+    public void processElement(
+        ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) {
+      ByteKey cursor = tracker.currentRestriction().getStartKey();
+      RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(cursor, jedis.dbSize(), true);
       ScanParams scanParams = new ScanParams();
       scanParams.match(c.element());
-
-      String cursor = ScanParams.SCAN_POINTER_START;
-      boolean finished = false;
-      while (!finished) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
-        List<String> keys = scanResult.getResult();
-        for (String k : keys) {
-          c.output(k);
-        }
-        cursor = scanResult.getCursor();
-        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
-          finished = true;
-        }
-      }
-    }
-  }
-
-  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
-  private static class ReadFn extends BaseReadFn<KV<String, String>> {
-    transient @Nullable Multimap<BoundedWindow, String> bundles = null;
-    @Nullable AtomicInteger batchCount = null;
-    private final int batchSize;
-
-    ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) {
-      super(connectionConfiguration);
-      this.batchSize = batchSize;
-    }
-
-    @StartBundle
-    public void startBundle() {
-      bundles = ArrayListMultimap.create();
-      batchCount = new AtomicInteger();
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      String key = c.element();
-      bundles.put(window, key);
-      if (batchCount.incrementAndGet() > getBatchSize()) {
-        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-        for (BoundedWindow w : kvs.keySet()) {
-          for (KV<String, String> kv : kvs.get(w)) {
-            c.output(kv);
-          }
-        }
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext context) {
-      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-      for (BoundedWindow w : kvs.keySet()) {
-        for (KV<String, String> kv : kvs.get(w)) {
-          context.output(kv, w.maxTimestamp(), w);
-        }
-      }
-    }
-
-    private int getBatchSize() {
-      return batchSize;
-    }
-
-    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
-      Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
-      for (BoundedWindow w : bundles.keySet()) {
-        String[] keys = new String[bundles.get(w).size()];
-        bundles.get(w).toArray(keys);
-        List<String> results = jedis.mget(keys);
-        for (int i = 0; i < results.size(); i++) {
-          if (results.get(i) != null) {
-            kvs.put(w, KV.of(keys[i], results.get(i)));
+      while (tracker.tryClaim(cursor)) {
+        ScanResult<String> scanResult = jedis.scan(redisCursor.getCursor(), scanParams);
+        if (scanResult.getResult().size() > 0) {
+          String[] keys = scanResult.getResult().toArray(new String[scanResult.getResult().size()]);
+          List<String> results = jedis.mget(keys);
+          for (int i = 0; i < results.size(); i++) {
+            if (results.get(i) != null) {
+              c.output(KV.of(keys[i], results.get(i)));
+            }
           }
         }
+        redisCursor = RedisCursor.of(scanResult.getCursor(), jedis.dbSize(), false);

Review comment:
       Yeah, that is correct, we're handling the case properly.
   This is being handled by the try/claim in line 353-361, since the ByteKey has a special case for the 0 value at the end of the cursor.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org