You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2020/06/20 20:05:42 UTC

[beam] 03/03: --wip-- [skip ci]

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 98639d1685332700db7f163e614d63f02c955beb
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Wed Apr 15 19:49:15 2020 +0200

    --wip-- [skip ci]
---
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 85 +++++++++++++++++++++-
 1 file changed, 84 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 2757f2e..0c90ad8 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -131,6 +131,11 @@ public class RedisIO {
         .build();
   }
 
+  /** Like {@link #read()} but executes multiple reads from a a {@link PCollection} as Reads. */
+  public static ReadAll readAll() {
+    return new ReadAll();
+  }
+
   /** Write data to a Redis server. */
   public static Write write() {
     return new AutoValue_RedisIO_Write.Builder()
@@ -315,12 +320,90 @@ public class RedisIO {
     }
   }
 
+  /** Implementation of {@link #readKeyPatterns()}. */
+  public abstract static class ReadAll
+      extends PTransform<PCollection<Read>, PCollection<KV<String, String>>> {
+    @Override
+    public PCollection<KV<String, String>> expand(PCollection<Read> input) {
+      return input
+          //          .apply("Split", ParDo.of(new SplitFn()))
+          .apply("Reshuffle", Reshuffle.viaRandomKey())
+          .apply("Read", ParDo.of(new ReadFn()));
+    }
+  }
+
+  private static class ReadFn extends DoFn<Read, KV<String, String>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      Read spec = c.element();
+      Jedis jedis = spec.connectionConfiguration().connect();
+
+      Multimap<BoundedWindow, String> bundles = ArrayListMultimap.create();
+      AtomicInteger batchCount = new AtomicInteger();
+
+      ScanParams scanParams = new ScanParams();
+      scanParams.match(spec.keyPattern());
+      String cursor = ScanParams.SCAN_POINTER_START;
+      boolean finished = false;
+      while (!finished) {
+        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+        List<String> keys = scanResult.getResult();
+        for (String key : keys) {
+//          c.output(k);
+//          String key = c.element();
+
+          bundles.put(window, key);
+          if (batchCount.incrementAndGet() > spec.batchSize()) {
+            Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+            for (BoundedWindow w : kvs.keySet()) {
+              for (KV<String, String> kv : kvs.get(w)) {
+                c.output(kv);
+              }
+            }
+          }
+
+        }
+        cursor = scanResult.getCursor();
+        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
+          finished = true;
+        }
+      }
+
+      bundles.put(window, key);
+      if (batchCount.incrementAndGet() > getBatchSize()) {
+        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+        for (BoundedWindow w : kvs.keySet()) {
+          for (KV<String, String> kv : kvs.get(w)) {
+            c.output(kv);
+          }
+        }
+      }
+    }
+
+    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
+      Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
+      for (BoundedWindow w : bundles.keySet()) {
+        String[] keys = new String[bundles.get(w).size()];
+        bundles.get(w).toArray(keys);
+        List<String> results = jedis.mget(keys);
+        for (int i = 0; i < results.size(); i++) {
+          if (results.get(i) != null) {
+            kvs.put(w, KV.of(keys[i], results.get(i)));
+          }
+        }
+      }
+      bundles = ArrayListMultimap.create();
+      batchCount.set(0);
+      return kvs;
+    }
+  }
+
   private abstract static class BaseReadFn<T> extends DoFn<String, T> {
     protected final RedisConnectionConfiguration connectionConfiguration;
 
     transient Jedis jedis;
 
-    BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
+    BaseReadFn() {
       this.connectionConfiguration = connectionConfiguration;
     }