You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/20 16:21:13 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15549: [BEAM-11997] Changed RedisIO implementation to SDF

lukecwik commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r732945649



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,94 +336,85 @@ public void setup() {
     public void teardown() {
       jedis.close();
     }
-  }
-
-  private static class ReadKeysWithPattern extends BaseReadFn<String> {
 
-    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
-      super(connectionConfiguration);
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element String pattern) {
+      return new OffsetRange(0, getKeyPatters(pattern).size());
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      ScanParams scanParams = new ScanParams();
-      scanParams.match(c.element());
-
-      String cursor = ScanParams.SCAN_POINTER_START;
-      boolean finished = false;
-      while (!finished) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
-        List<String> keys = scanResult.getResult();
-        for (String k : keys) {
-          c.output(k);
+    public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {

Review comment:
       I was under the impression that you wanted to add support for checkpointing only and didn't want to support generic splitting since that would require you to generate cursors using the information from https://stackoverflow.com/a/59569053
   
   In the checkpointing only case you would do something like:
   ```
   processElement(...) {
   cursor = restriction.getFrom()
   while (true) {
     values, nextCursor = redis.scan(cursor)
     if (!tryClaim(nextCursor))) {  // claims [cursor, nextCursor)
       return STOP
     }
     output values
     if (nextCursor == END) {
       return STOP
     }
     cursor = nextCursor
   }
   return STOP
   }
   ```
   
   You would be claiming ranges `[cursor, nextCursor)` (or `[cursor, END]` if `nextCursor == END`). `tryClaim` would fail if `cursor >= end`. This requires you to still implement a subset of https://stackoverflow.com/a/59569053 so that you can compare `cursor` against the restriction end.
   
   Note that since you can't control what the next cursor being returned from SCAN is you will output duplicates and would need a deduplication transform to follow it.
   
   When `tryClaim(nextCursor)` succeeds it would updates its restriction `[from, end]` to `[nextCursor, end]`. A restriction would be considered done when `from >= end` (except for the special case where the range is [0, 0] representing start and end. Checkpointing would be allowed if `from != end` and make the current restriction `[from, from]` (effectively an empty range) and return a residual range `[from, end]`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org