You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2020/06/20 20:05:39 UTC

[beam] branch BEAM-9403-redisio-readall-backup created (now 98639d1)

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a change to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 98639d1  --wip-- [skip ci]

This branch includes the following new commits:

     new 5ae8173  Deprecate RedisIO.readAll transform and add RedisIO.readKeys as a replacement
     new fd95dfc  Remove RedisIO.ReadAll transform based on Key Patterns
     new 98639d1  --wip-- [skip ci]

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 02/03: Remove RedisIO.ReadAll transform based on Key Patterns

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fd95dfc22efd0a5951894b104cdcffa9d866d9c2
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Apr 13 17:34:52 2020 +0200

    Remove RedisIO.ReadAll transform based on Key Patterns
---
 .../main/java/org/apache/beam/sdk/io/redis/RedisIO.java  | 16 ----------------
 1 file changed, 16 deletions(-)

diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 77fbb6a..2757f2e 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -131,22 +131,6 @@ public class RedisIO {
         .build();
   }
 
-  /**
-   * Like {@link #read()} but executes multiple instances of the Redis query substituting each
-   * element of a {@link PCollection} as key pattern.
-   *
-   * @deprecated This method is not consistent with the readAll pattern of other transforms and will
-   *     be remove soon. Please update you code to use {@link #readKeyPatterns()} instead.
-   */
-  @Deprecated
-  public static ReadKeyPatterns readAll() {
-    return new AutoValue_RedisIO_ReadKeyPatterns.Builder()
-        .setConnectionConfiguration(RedisConnectionConfiguration.create())
-        .setBatchSize(1000)
-        .setOutputParallelization(true)
-        .build();
-  }
-
   /** Write data to a Redis server. */
   public static Write write() {
     return new AutoValue_RedisIO_Write.Builder()


[beam] 03/03: --wip-- [skip ci]

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 98639d1685332700db7f163e614d63f02c955beb
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Wed Apr 15 19:49:15 2020 +0200

    --wip-- [skip ci]
---
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 85 +++++++++++++++++++++-
 1 file changed, 84 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 2757f2e..0c90ad8 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -131,6 +131,11 @@ public class RedisIO {
         .build();
   }
 
+  /** Like {@link #read()} but executes multiple reads from a a {@link PCollection} as Reads. */
+  public static ReadAll readAll() {
+    return new ReadAll();
+  }
+
   /** Write data to a Redis server. */
   public static Write write() {
     return new AutoValue_RedisIO_Write.Builder()
@@ -315,12 +320,90 @@ public class RedisIO {
     }
   }
 
+  /** Implementation of {@link #readKeyPatterns()}. */
+  public abstract static class ReadAll
+      extends PTransform<PCollection<Read>, PCollection<KV<String, String>>> {
+    @Override
+    public PCollection<KV<String, String>> expand(PCollection<Read> input) {
+      return input
+          //          .apply("Split", ParDo.of(new SplitFn()))
+          .apply("Reshuffle", Reshuffle.viaRandomKey())
+          .apply("Read", ParDo.of(new ReadFn()));
+    }
+  }
+
+  private static class ReadFn extends DoFn<Read, KV<String, String>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      Read spec = c.element();
+      Jedis jedis = spec.connectionConfiguration().connect();
+
+      Multimap<BoundedWindow, String> bundles = ArrayListMultimap.create();
+      AtomicInteger batchCount = new AtomicInteger();
+
+      ScanParams scanParams = new ScanParams();
+      scanParams.match(spec.keyPattern());
+      String cursor = ScanParams.SCAN_POINTER_START;
+      boolean finished = false;
+      while (!finished) {
+        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+        List<String> keys = scanResult.getResult();
+        for (String key : keys) {
+//          c.output(k);
+//          String key = c.element();
+
+          bundles.put(window, key);
+          if (batchCount.incrementAndGet() > spec.batchSize()) {
+            Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+            for (BoundedWindow w : kvs.keySet()) {
+              for (KV<String, String> kv : kvs.get(w)) {
+                c.output(kv);
+              }
+            }
+          }
+
+        }
+        cursor = scanResult.getCursor();
+        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
+          finished = true;
+        }
+      }
+
+      bundles.put(window, key);
+      if (batchCount.incrementAndGet() > getBatchSize()) {
+        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+        for (BoundedWindow w : kvs.keySet()) {
+          for (KV<String, String> kv : kvs.get(w)) {
+            c.output(kv);
+          }
+        }
+      }
+    }
+
+    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
+      Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
+      for (BoundedWindow w : bundles.keySet()) {
+        String[] keys = new String[bundles.get(w).size()];
+        bundles.get(w).toArray(keys);
+        List<String> results = jedis.mget(keys);
+        for (int i = 0; i < results.size(); i++) {
+          if (results.get(i) != null) {
+            kvs.put(w, KV.of(keys[i], results.get(i)));
+          }
+        }
+      }
+      bundles = ArrayListMultimap.create();
+      batchCount.set(0);
+      return kvs;
+    }
+  }
+
   private abstract static class BaseReadFn<T> extends DoFn<String, T> {
     protected final RedisConnectionConfiguration connectionConfiguration;
 
     transient Jedis jedis;
 
-    BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
+    BaseReadFn() {
       this.connectionConfiguration = connectionConfiguration;
     }
 


[beam] 01/03: Deprecate RedisIO.readAll transform and add RedisIO.readKeys as a replacement

Posted by ie...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5ae8173528c5594a1be9f9a6263e150683d6de02
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Apr 13 17:28:08 2020 +0200

    Deprecate RedisIO.readAll transform and add RedisIO.readKeys as a replacement
---
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 46 +++++++++++++++-------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 313f6f2..77fbb6a 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -80,13 +80,13 @@ import redis.clients.jedis.ScanResult;
  *
  * }</pre>
  *
- * <p>{@link #readAll()} can be used to request Redis server using input PCollection elements as key
- * pattern (as String).
+ * <p>{@link #readKeyPatterns()} can be used to request Redis server using input PCollection
+ * elements as key pattern (as String).
  *
  * <pre>{@code
  * pipeline.apply(...)
  *    // here we have a PCollection<String> with the key patterns
- *    .apply(RedisIO.readAll().withEndpoint("::1", 6379))
+ *    .apply(RedisIO.readKeyPatterns().withEndpoint("::1", 6379))
  *   // here we have a PCollection<KV<String,String>>
  *
  * }</pre>
@@ -123,8 +123,24 @@ public class RedisIO {
    * Like {@link #read()} but executes multiple instances of the Redis query substituting each
    * element of a {@link PCollection} as key pattern.
    */
-  public static ReadAll readAll() {
-    return new AutoValue_RedisIO_ReadAll.Builder()
+  public static ReadKeyPatterns readKeyPatterns() {
+    return new AutoValue_RedisIO_ReadKeyPatterns.Builder()
+        .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .setBatchSize(1000)
+        .setOutputParallelization(true)
+        .build();
+  }
+
+  /**
+   * Like {@link #read()} but executes multiple instances of the Redis query substituting each
+   * element of a {@link PCollection} as key pattern.
+   *
+   * @deprecated This method is not consistent with the readAll pattern of other transforms and will
+   *     be remove soon. Please update you code to use {@link #readKeyPatterns()} instead.
+   */
+  @Deprecated
+  public static ReadKeyPatterns readAll() {
+    return new AutoValue_RedisIO_ReadKeyPatterns.Builder()
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
         .setBatchSize(1000)
         .setOutputParallelization(true)
@@ -229,16 +245,16 @@ public class RedisIO {
           .apply(Create.of(keyPattern()))
           .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration())))
           .apply(
-              RedisIO.readAll()
+              RedisIO.readKeyPatterns()
                   .withConnectionConfiguration(connectionConfiguration())
                   .withBatchSize(batchSize())
                   .withOutputParallelization(outputParallelization()));
     }
   }
 
-  /** Implementation of {@link #readAll()}. */
+  /** Implementation of {@link #readKeyPatterns()}. */
   @AutoValue
-  public abstract static class ReadAll
+  public abstract static class ReadKeyPatterns
       extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
 
     @Nullable
@@ -259,10 +275,10 @@ public class RedisIO {
 
       abstract Builder setOutputParallelization(boolean outputParallelization);
 
-      abstract ReadAll build();
+      abstract ReadKeyPatterns build();
     }
 
-    public ReadAll withEndpoint(String host, int port) {
+    public ReadKeyPatterns withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
       checkArgument(port > 0, "port can not be negative or 0");
       return toBuilder()
@@ -270,26 +286,26 @@ public class RedisIO {
           .build();
     }
 
-    public ReadAll withAuth(String auth) {
+    public ReadKeyPatterns withAuth(String auth) {
       checkArgument(auth != null, "auth can not be null");
       return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
           .build();
     }
 
-    public ReadAll withTimeout(int timeout) {
+    public ReadKeyPatterns withTimeout(int timeout) {
       checkArgument(timeout >= 0, "timeout can not be negative");
       return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
           .build();
     }
 
-    public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
+    public ReadKeyPatterns withConnectionConfiguration(RedisConnectionConfiguration connection) {
       checkArgument(connection != null, "connection can not be null");
       return toBuilder().setConnectionConfiguration(connection).build();
     }
 
-    public ReadAll withBatchSize(int batchSize) {
+    public ReadKeyPatterns withBatchSize(int batchSize) {
       return toBuilder().setBatchSize(batchSize).build();
     }
 
@@ -297,7 +313,7 @@ public class RedisIO {
      * Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
      * default is to parallelize and should only be changed if this is known to be unnecessary.
      */
-    public ReadAll withOutputParallelization(boolean outputParallelization) {
+    public ReadKeyPatterns withOutputParallelization(boolean outputParallelization) {
       return toBuilder().setOutputParallelization(outputParallelization).build();
     }