You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "sigalite (via GitHub)" <gi...@apache.org> on 2023/04/07 09:52:49 UTC

[GitHub] [beam] sigalite opened a new issue, #26165: [Feature Request]: RedisIO support pcollection of in addition to the api

sigalite opened a new issue, #26165:
URL: https://github.com/apache/beam/issues/26165

   ### What would you like to happen?
   
   add new API in RedisIO to support <byte[], byte[]> currently the api supports only Strings.
   In case we would like to save to redis encoded Avro messages and decode it when reading  there is a failure.
   The problem is that casting or creating new string out of the avro byte array added headers which cannot be decoded easily when reading..
   
   currently in our implementation we wrote a custom Redis connector that does that. we will be happy to contribute the code.
   either by adding new connector or adding new APIs to current connector
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TeoZosa commented on issue #26165: [Feature Request]: RedisIO support pcollection of in addition to the api

Posted by "TeoZosa (via GitHub)" <gi...@apache.org>.
TeoZosa commented on issue #26165:
URL: https://github.com/apache/beam/issues/26165#issuecomment-1655118265

   +1 for this feature as well. In our case we use marshaled Protobuf bytes and stringifying the byte array leads to data loss and subsequent corruption. 
   
   For an immediate fix, we did the simplest thing and vendored the Redis IO, swapping `<String, String>` to `<byte[], byte[]>`. @sigalite's solution sounds more robust?
   
   <details> <summary> ex. changes to the `Write` class (click to view) </summary>
   
   ```diff
   ===================================================================
   diff --git a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java
   --- a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java	(revision aaa)
   +++ b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java	(revision bbb)
   @@ -20,6 +20,7 @@
    import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
    
    import com.google.auto.value.AutoValue;
   +import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    import org.apache.beam.sdk.coders.KvCoder;
   @@ -437,9 +438,9 @@
        }
      }
    
   -  /** A {@link PTransform} to write to a Redis server. */
   +  /** A {@link PTransform} to write to a Redis server. */
      @AutoValue
   -  public abstract static class Write extends PTransform<PCollection<KV<String, String>>, PDone> {
   +  public abstract static class Write extends PTransform<PCollection<KV<byte[], byte[]>>, PDone> {
    
        /** Determines the method used to insert data in Redis. */
        public enum Method {
   @@ -540,14 +541,14 @@
        }
    
        @Override
   -    public PDone expand(PCollection<KV<String, String>> input) {
   +    public PDone expand(PCollection<KV<byte[], byte[]>> input) {
          checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
    
          input.apply(ParDo.of(new WriteFn(this)));
          return PDone.in(input.getPipeline());
        }
    
   -    private static class WriteFn extends DoFn<KV<String, String>, Void> {
   +    private static class WriteFn extends DoFn<KV<byte[], byte[]>, Void> {
    
          private static final int DEFAULT_BATCH_SIZE = 1000;
    
   @@ -575,7 +576,7 @@
    
          @ProcessElement
          public void processElement(ProcessContext c) {
   -        KV<String, String> record = c.element();
   +        KV<byte[], byte[]> record = c.element();
    
            writeRecord(record);
    
   @@ -588,7 +589,7 @@
            }
          }
    
   -      private void writeRecord(KV<String, String> record) {
   +      private void writeRecord(KV<byte[], byte[]> record) {
            Method method = spec.method();
            Long expireTime = spec.expireTime();
    
   @@ -609,18 +610,18 @@
            }
          }
    
   -      private void writeUsingAppendCommand(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingAppendCommand(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.append(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingSetCommand(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingSetCommand(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            if (expireTime != null) {
              transaction.psetex(key, expireTime, value);
   @@ -630,10 +631,10 @@
          }
    
          private void writeUsingListCommand(
   -          KV<String, String> record, Method method, Long expireTime) {
   +          KV<byte[], byte[]> record, Method method, Long expireTime) {
    
   -        String key = record.getKey();
   -        String value = record.getValue();
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            if (Method.LPUSH == method) {
              transaction.lpush(key, value);
   @@ -644,43 +645,43 @@
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingSaddCommand(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingSaddCommand(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.sadd(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingHLLCommand(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingHLLCommand(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.pfadd(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingIncrBy(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   -        long inc = Long.parseLong(value);
   +      private void writeUsingIncrBy(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
   +        long inc = Long.parseLong(new String(value, StandardCharsets.ISO_8859_1));
            transaction.incrBy(key, inc);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingDecrBy(KV<String, String> record, Long expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   -        long decr = Long.parseLong(value);
   +      private void writeUsingDecrBy(KV<byte[], byte[]> record, Long expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
   +        long decr = Long.parseLong(new String(value, StandardCharsets.ISO_8859_1));
            transaction.decrBy(key, decr);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void setExpireTimeWhenRequired(String key, Long expireTime) {
   +      private void setExpireTimeWhenRequired(byte[] key, Long expireTime) {
            if (expireTime != null) {
              transaction.pexpire(key, expireTime);
            }
   @@ -706,12 +707,12 @@
      }
    
      /**
   -   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
   +   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
       * Redis server.
       */
      @AutoValue
      public abstract static class WriteStreams
   -      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
   +      extends PTransform<PCollection<KV<byte[], Map<byte[], byte[]>>>, PDone> {
    
        abstract RedisConnectionConfiguration connectionConfiguration();
    
   @@ -784,14 +785,14 @@
        }
    
        @Override
   -    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
   +    public PDone expand(PCollection<KV<byte[], Map<byte[], byte[]>>> input) {
          checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
    
          input.apply(ParDo.of(new WriteStreamFn(this)));
          return PDone.in(input.getPipeline());
        }
    
   -    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
   +    private static class WriteStreamFn extends DoFn<KV<byte[], Map<byte[], byte[]>>, Void> {
    
          private static final int DEFAULT_BATCH_SIZE = 1000;
    
   @@ -819,7 +820,7 @@
    
          @ProcessElement
          public void processElement(ProcessContext c) {
   -        KV<String, Map<String, String>> record = c.element();
   +        KV<byte[], Map<byte[], byte[]>> record = c.element();
    
            writeRecord(record);
    
   @@ -832,9 +833,9 @@
            }
          }
    
   -      private void writeRecord(KV<String, Map<String, String>> record) {
   -        String key = record.getKey();
   -        Map<String, String> value = record.getValue();
   +      private void writeRecord(KV<byte[], Map<byte[], byte[]>> record) {
   +        byte[] key = record.getKey();
   +        Map<byte[], byte[]> value = record.getValue();
            final XAddParams params = new XAddParams().id(StreamEntryID.NEW_ENTRY);
            if (spec.maxLen() > 0L) {
              params.maxLen(spec.maxLen());
   
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org