You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/09 12:33:23 UTC

[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r765643092



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {

Review comment:
       As the two test cases share the same streams/keys, results are currently dependent on the execution order as they share a redis instance (per class).
   
   One option to isolate tests might be to simply use different streams/keys. Alternatively you could add an additional configuration to allow the selection of a different Redis db (index) for isolation.
   
   Please note that isolation of tests is actually a critical requirement so they can be run in parallel / forked if needed.  And additionally, such tests provide good documentation and help understanding.
   
   
   

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -142,6 +144,15 @@ public static Write write() {
         .build();
   }
 
+  /** Write data to a Redis server. */
+  public static WriteStreams writeStreams() {

Review comment:
       @n-oden Please document the usage of `writeStreams` in the Javadocs of RedisIO.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =

Review comment:
       Please use (vendored) guava ImmutableMap to generate your maps more easily, e.g.
   ```java
   Map<String, String> values = ImmutableMap.of(
     "foo", "bar",
     "baz", "qux"
   );
   ```
   You could use stream, however, to generate your test data:
   ```java
       Map<String, String> values =
           ImmutableMap.of(
               "foo", "bar",
               "baz", "qux");
       List<KV<String, Map<String, String>>> data =
           Stream.of("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")
               .map(key -> KV.of(key, values))
               .collect(Collectors.toList());
   ```

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();

Review comment:
       You're setting a default in `writeStreams()` already, so this doesn't have to be @Nullable anymore.

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();

Review comment:
       Same as above, `maxLen` doesn't have to be nullable and could actually be a primitive `long`

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       I'm a bit concerned about error handling here, also same in `processElement`. I know the code here happens to be the same as in the existing `write`, but probably a good time to discuss. 
   
   It's been a while for me since using `jedis`, but as far as I remember it 's important to check each response to be sure errors are not silently ignored. Where I'm honestly not sure is if all the nested responses have to be checked as well.
   
   ```java
             Response<List<Object>> resp = pipeline.exec();
             pipeline.close(); // does sync internally
             resp.get(); // this may throw
   ```
   
   

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {

Review comment:
       Would be great to add Javadocs to all the builder methods starting from here

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void testWriteStreamsWithTruncation() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L));

Review comment:
       Please reconsider what / how you are testing here.
   As you are only adding one item per stream / key, there's really not much to be truncated. Also, I'd recommend disabling approximate trim for testing. You likely won't get the result you're expecting otherwise.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);

Review comment:
       Please make sure there's at least one test case to verify the data is also correctly written to the stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org