You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/03/22 17:53:32 UTC

[beam] branch master updated: [BEAM-6483] Add support for SADD operation to RedisIO.Write

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e2f06a  [BEAM-6483] Add support for SADD operation to RedisIO.Write
     new 664e5ab  Merge pull request #7587: [BEAM-6483] Add support for SADD operation to RedisIO.Write
3e2f06a is described below

commit 3e2f06a8f1634220e87649969e16d5b5d47aac12
Author: Kengo Seki <se...@apache.org>
AuthorDate: Tue Jan 22 10:11:10 2019 -0800

    [BEAM-6483] Add support for SADD operation to RedisIO.Write
    
    For now, RedisIO.Write supports write methods for string (APPEND, SET),
    list (LPUSH, RPUSH) and HyperLogLog (PFADD), but not for set (SADD).
    This PR adds it. In addition, I did the following refactoring in this:
    
    * make the input value check for port number stricter
    * replace a magic number indicating the end of a loop with a constant
    * remove an unnecessary argument from writeUsingHLLCommand,
      which is a private method used only internally
---
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 20 ++++++++++++----
 .../org/apache/beam/sdk/io/redis/RedisIOTest.java  | 28 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 8d6b0be..06ba187 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -167,7 +167,7 @@ public class RedisIO {
 
     public Read withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
-      checkArgument(port > 0, "port can not be negative or 0");
+      checkArgument(0 < port && port < 65536, "port must be a positive integer less than 65536");
       return builder()
           .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
           .build();
@@ -320,7 +320,7 @@ public class RedisIO {
           processContext.output(k);
         }
         cursor = scanResult.getStringCursor();
-        if ("0".equals(cursor)) {
+        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
           finished = true;
         }
       }
@@ -446,6 +446,9 @@ public class RedisIO {
        */
       RPUSH,
 
+      /** Use SADD command. Insert value in a set. Duplicated values are ignored. */
+      SADD,
+
       /** Use PFADD command. Insert value in a HLL structure. Create key if it doesn't exist */
       PFADD
     }
@@ -570,8 +573,10 @@ public class RedisIO {
           writeUsingSetCommand(record, expireTime);
         } else if (Method.LPUSH == method || Method.RPUSH == method) {
           writeUsingListCommand(record, method, expireTime);
+        } else if (Method.SADD == method) {
+          writeUsingSaddCommand(record, expireTime);
         } else if (Method.PFADD == method) {
-          writeUsingHLLCommand(record, method, expireTime);
+          writeUsingHLLCommand(record, expireTime);
         }
       }
 
@@ -610,7 +615,14 @@ public class RedisIO {
         setExpireTimeWhenRequired(key, expireTime);
       }
 
-      private void writeUsingHLLCommand(KV<String, String> record, Method method, Long expireTime) {
+      private void writeUsingSaddCommand(KV<String, String> record, Long expireTime) {
+        String key = record.getKey();
+        String value = record.getValue();
+
+        pipeline.sadd(key, value);
+      }
+
+      private void writeUsingHLLCommand(KV<String, String> record, Long expireTime) {
         String key = record.getKey();
         String value = record.getValue();
 
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 529a854..c33355c 100644
--- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.redis;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -29,6 +31,8 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,6 +191,30 @@ public class RedisIOTest {
   }
 
   @Test
+  public void testWriteReadUsingSaddMethod() throws Exception {
+    String key = "key";
+
+    Jedis jedis =
+        RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect();
+
+    List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5");
+    List<KV<String, String>> kvs = Lists.newArrayList();
+    for (String value : values) {
+      kvs.add(KV.of(key, value));
+    }
+
+    PCollection<KV<String, String>> write = writePipeline.apply(Create.of(kvs));
+    write.apply(
+        RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.SADD));
+
+    writePipeline.run();
+
+    Set<String> expected = Sets.newHashSet(values);
+    Set<String> members = jedis.smembers(key);
+    Assert.assertEquals(expected, members);
+  }
+
+  @Test
   public void testWriteUsingHLLMethod() throws Exception {
     String key = "key";