You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/10/17 05:57:30 UTC

[1/2] beam git commit: [BEAM-1017] Add RedisIO

Repository: beam
Updated Branches:
  refs/heads/master 4b502bf71 -> 9524cbd88


[BEAM-1017] Add RedisIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0d8ab6cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0d8ab6cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0d8ab6cb

Branch: refs/heads/master
Commit: 0d8ab6cbbc762dd9f9be1b3e9a26b6c9d0bb6dc3
Parents: 4b502bf
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Mar 22 19:03:00 2017 +0100
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Oct 17 07:37:14 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/redis/pom.xml                      |  90 ++++
 .../io/redis/RedisConnectionConfiguration.java  | 122 +++++
 .../org/apache/beam/sdk/io/redis/RedisIO.java   | 451 +++++++++++++++++++
 .../apache/beam/sdk/io/redis/package-info.java  |  22 +
 .../apache/beam/sdk/io/redis/RedisIOTest.java   | 109 +++++
 6 files changed, 795 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 49eb796..99936a2 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -57,6 +57,7 @@
     <module>kinesis</module>
     <module>mongodb</module>
     <module>mqtt</module>
+    <module>redis</module>
     <module>solr</module>
     <module>tika</module>
     <module>xml</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/pom.xml b/sdks/java/io/redis/pom.xml
new file mode 100644
index 0000000..d89e627
--- /dev/null
+++ b/sdks/java/io/redis/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-redis</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Redis</name>
+  <description>IO to read and write on a Redis keystore.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>redis.clients</groupId>
+      <artifactId>jedis</artifactId>
+      <version>2.9.0</version>
+    </dependency>
+
+    <!-- compile dependency -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.github.kstyrc</groupId>
+      <artifactId>embedded-redis</artifactId>
+      <version>0.6</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
new file mode 100644
index 0000000..efcc77b
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+/**
+ * {@code RedisConnectionConfiguration} describes and wraps a connectionConfiguration to Redis
+ * server or cluster.
+ */
+@AutoValue
+public abstract class RedisConnectionConfiguration implements Serializable {
+
+  abstract String host();
+  abstract int port();
+  @Nullable abstract String auth();
+  abstract int timeout();
+
+  abstract Builder builder();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setHost(String host);
+    abstract Builder setPort(int port);
+    abstract Builder setAuth(String auth);
+    abstract Builder setTimeout(int timeout);
+    abstract RedisConnectionConfiguration build();
+  }
+
+  public static RedisConnectionConfiguration create() {
+    return new AutoValue_RedisConnectionConfiguration.Builder()
+        .setHost(Protocol.DEFAULT_HOST)
+        .setPort(Protocol.DEFAULT_PORT)
+        .setTimeout(Protocol.DEFAULT_TIMEOUT).build();
+  }
+
+  public static RedisConnectionConfiguration create(String host, int port) {
+    return new AutoValue_RedisConnectionConfiguration.Builder()
+        .setHost(host)
+        .setPort(port)
+        .setTimeout(Protocol.DEFAULT_TIMEOUT).build();
+  }
+
+  /**
+   * Define the host name of the Redis server.
+   */
+  public RedisConnectionConfiguration withHost(String host) {
+    checkArgument(host != null, "host can not be null");
+    return builder().setHost(host).build();
+  }
+
+  /**
+   * Define the port number of the Redis server.
+   */
+  public RedisConnectionConfiguration withPort(int port) {
+    checkArgument(port > 0, "port can not be negative or 0");
+    return builder().setPort(port).build();
+  }
+
+  /**
+   * Define the password to authenticate on the Redis server.
+   */
+  public RedisConnectionConfiguration withAuth(String auth) {
+    checkArgument(auth != null, "auth can not be null");
+    return builder().setAuth(auth).build();
+  }
+
+  /**
+   * Define the Redis connection timeout. A timeout of zero is interpreted as an infinite timeout.
+   */
+  public RedisConnectionConfiguration withTimeout(int timeout) {
+    checkArgument(timeout >= 0, "timeout can not be negative");
+    return builder().setTimeout(timeout).build();
+  }
+
+  /**
+   * Connect to the Redis instance.
+   */
+  public Jedis connect() {
+    Jedis jedis = new Jedis(host(), port(), timeout());
+    if (auth() != null) {
+      jedis.auth(auth());
+    }
+    return jedis;
+  }
+
+  /**
+   * Populate the display data with connectionConfiguration details.
+   */
+  public void populateDisplayData(DisplayData.Builder builder) {
+    builder.add(DisplayData.item("host", host()));
+    builder.add(DisplayData.item("port", port()));
+    builder.addIfNotNull(DisplayData.item("timeout", timeout()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
new file mode 100644
index 0000000..bfbad13
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+/**
+ * An IO to manipulate Redis key/value database.
+ *
+ * <h3>Reading Redis key/value pairs</h3>
+ *
+ * <p>{@link #read()} provides a source which returns a bounded {@link PCollection} containing
+ * key/value pairs as {@code KV<String, String>}.
+ *
+ * <p>To configure a Redis source, you have to provide Redis server hostname and port number.
+ * Optionally, you can provide a key pattern (to filter the keys). The following example
+ * illustrates how to configure a source:
+ *
+ * <pre>{@code
+ *
+ *  pipeline.apply(RedisIO.read()
+ *    .withEndpoint("::1", 6379)
+ *    .withKeyPattern("foo*"))
+ *
+ * }</pre>
+ *
+ * <p>It's also possible to specify Redis authentication and connection timeout with the
+ * corresponding methods:
+ *
+ * <pre>{@code
+ *
+ *  pipeline.apply(RedisIO.read()
+ *    .withEndpoint("::1", 6379)
+ *    .withAuth("authPassword")
+ *    .withTimeout(60000)
+ *    .withKeyPattern("foo*"))
+ *
+ * }</pre>
+ *
+ * <p>{@link #readAll()} can be used to request Redis server using input PCollection elements as key
+ * pattern (as String).
+ *
+ * <pre>{@code
+ *
+ *  pipeline.apply(...)
+ *     // here we have a PCollection<String> with the key patterns
+ *     .apply(RedisIO.readAll().withEndpoint("::1", 6379))
+ *    // here we have a PCollection<KV<String,String>>
+ *
+ * }</pre>
+ *
+ * <h3>Writing Redis key/value pairs</h3>
+ *
+ * <p>{@link #write()} provides a sink to write key/value pairs represented as
+ * {@link KV} from an incoming {@link PCollection}.
+ *
+ * <p>To configure the target Redis server, you have to provide Redis server hostname and port
+ * number. The following example illustrates how to configure a sink:
+ *
+ * <pre>{@code
+ *
+ *  pipeline.apply(...)
+ *    // here we a have a PCollection<String, String> with key/value pairs
+ *    .apply(RedisIO.write().withEndpoint("::1", 6379))
+ *
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class RedisIO {
+
+  /**
+   * Read data from a Redis server.
+   */
+  public static Read read() {
+    return new AutoValue_RedisIO_Read.Builder()
+        .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .setKeyPattern("*").build();
+  }
+
+  /**
+   * Like {@link #read()} but executes multiple instances of the Redis query substituting each
+   * element of a {@link PCollection} as key pattern.
+   */
+  public static ReadAll readAll() {
+    return new AutoValue_RedisIO_ReadAll.Builder()
+        .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .build();
+  }
+
+  /**
+   * Write data to a Redis server.
+   */
+  public static Write write() {
+    return new AutoValue_RedisIO_Write.Builder()
+        .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .build();
+  }
+
+  private RedisIO() {
+  }
+
+  /**
+   * Implementation of {@link #read()}.
+   */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<String, String>>> {
+
+    @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+    @Nullable abstract String keyPattern();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      @Nullable abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connection);
+      @Nullable abstract Builder setKeyPattern(String keyPattern);
+      abstract Read build();
+    }
+
+    public Read withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host))
+          .setConnectionConfiguration(connectionConfiguration().withPort(port))
+          .build();
+    }
+
+    public Read withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+    }
+
+    public Read withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return builder().setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public Read withKeyPattern(String keyPattern) {
+      checkArgument(keyPattern != null, "keyPattern can not be null");
+      return builder().setKeyPattern(keyPattern).build();
+    }
+
+    public Read withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return builder().setConnectionConfiguration(connection).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      connectionConfiguration().populateDisplayData(builder);
+    }
+
+    @Override
+    public PCollection<KV<String, String>> expand(PBegin input) {
+      checkArgument(connectionConfiguration() != null,
+          "withConnectionConfiguration() is required");
+
+      return input
+          .apply(Create.of(keyPattern()))
+          .apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
+    }
+
+  }
+
+  /**
+   * Implementation of {@link #readAll()}.
+   */
+  @AutoValue
+  public abstract static class ReadAll
+      extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
+
+    @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+
+    abstract ReadAll.Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      @Nullable abstract ReadAll.Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connection);
+      abstract ReadAll build();
+    }
+
+    public ReadAll withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host))
+          .setConnectionConfiguration(connectionConfiguration().withPort(port))
+          .build();
+    }
+
+    public ReadAll withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+    }
+
+    public ReadAll withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout)).build();
+    }
+
+    public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return builder().setConnectionConfiguration(connection).build();
+    }
+
+    @Override
+    public PCollection<KV<String, String>> expand(PCollection<String> input) {
+      checkArgument(connectionConfiguration() != null,
+          "withConnectionConfiguration() is required");
+
+      return input.apply(ParDo.of(new ReadFn(connectionConfiguration())))
+          .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+          .apply(new Reparallelize());
+    }
+
+  }
+
+  /**
+   * A {@link DoFn} requesting Redis server to get key/value pairs.
+   */
+  private static class ReadFn extends DoFn<String, KV<String, String>> {
+
+    private final RedisConnectionConfiguration connectionConfiguration;
+
+    private transient Jedis jedis;
+
+    public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+      this.connectionConfiguration = connectionConfiguration;
+    }
+
+    @Setup
+    public void setup() {
+      jedis = connectionConfiguration.connect();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext processContext) throws Exception {
+      ScanParams scanParams = new ScanParams();
+      scanParams.match(processContext.element());
+
+      String cursor = ScanParams.SCAN_POINTER_START;
+      boolean finished = false;
+      while (!finished) {
+        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+        List<String> keys = scanResult.getResult();
+
+        Pipeline pipeline = jedis.pipelined();
+        if (keys != null) {
+          for (String key : keys) {
+            pipeline.get(key);
+          }
+          List<Object> values = pipeline.syncAndReturnAll();
+          for (int i = 0; i < values.size(); i++) {
+            processContext.output(KV.of(keys.get(i), (String) values.get(i)));
+          }
+        }
+
+        cursor = scanResult.getStringCursor();
+        if (cursor.equals("0")) {
+          finished = true;
+        }
+      }
+    }
+
+    @Teardown
+    public void teardown() {
+      jedis.close();
+    }
+
+  }
+
+  private static class Reparallelize
+      extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+    @Override public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+      // reparallelize mimics the same behavior as in JdbcIO
+      // breaking fusion
+      PCollectionView<Iterable<KV<String, String>>> empty = input
+          .apply("Consume",
+              Filter.by(SerializableFunctions.<KV<String, String>, Boolean>constant(false)))
+          .apply(View.<KV<String, String>>asIterable());
+      PCollection<KV<String, String>> materialized = input
+          .apply("Identity", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
+            @ProcessElement
+            public void processElement(ProcessContext context) {
+              context.output(context.element());
+            }
+      }).withSideInputs(empty));
+      return materialized.apply(Reshuffle.<KV<String, String>>viaRandomKey());
+    }
+  }
+
+  /**
+   * A {@link PTransform} to write to a Redis server.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<KV<String, String>>, PDone> {
+
+    @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Write build();
+
+    }
+
+    public Write withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host))
+          .setConnectionConfiguration(connectionConfiguration().withPort(port))
+          .build();
+    }
+
+    public Write withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public Write withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return builder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public Write withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return builder().setConnectionConfiguration(connection).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, String>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteFn extends DoFn<KV<String, String>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final Write spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteFn(Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext processContext) {
+        KV<String, String> record = processContext.element();
+        pipeline.append(record.getKey(), record.getValue());
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          batchCount = 0;
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        pipeline.exec();
+        batchCount = 0;
+      }
+
+      @Teardown
+      public void teardown() {
+        jedis.close();
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
new file mode 100644
index 0000000..a650acc
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Redis.
+ */
+package org.apache.beam.sdk.io.redis;

http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
new file mode 100644
index 0000000..b5ba847
--- /dev/null
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import redis.embedded.RedisServer;
+
+/**
+ * Test on the Redis IO.
+ */
+public class RedisIOTest {
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  private EmbeddedRedis embeddedRedis;
+
+  @Before
+  public void before() throws Exception {
+    embeddedRedis = new EmbeddedRedis();
+  }
+
+  @After
+  public void after() throws Exception {
+    embeddedRedis.close();
+  }
+
+  @Test
+  public void testWriteRead() throws Exception {
+    ArrayList<KV<String, String>> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      KV<String, String> kv = KV.of("key " + i, "value " + i);
+      data.add(kv);
+    }
+    PCollection<KV<String, String>> write = writePipeline.apply(Create.of(data));
+    write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
+
+    writePipeline.run();
+
+    PCollection<KV<String, String>> read = readPipeline.apply("Read",
+        RedisIO.read().withEndpoint("::1", embeddedRedis.getPort())
+            .withKeyPattern("key*"));
+    PAssert.that(read).containsInAnyOrder(data);
+
+    PCollection<KV<String,  String>> readNotMatch = readPipeline.apply("ReadNotMatch",
+        RedisIO.read().withEndpoint("::1", embeddedRedis.getPort())
+            .withKeyPattern("foobar*"));
+    PAssert.thatSingleton(readNotMatch.apply(Count.<KV<String, String>>globally())).isEqualTo(0L);
+
+    readPipeline.run();
+  }
+
+  /**
+   * Simple embedded Redis instance wrapper to control Redis server.
+   */
+  private static class EmbeddedRedis implements AutoCloseable {
+
+    private final int port;
+    private final RedisServer redisServer;
+
+    public EmbeddedRedis() throws IOException {
+      try (ServerSocket serverSocket = new ServerSocket(0)) {
+        port = serverSocket.getLocalPort();
+      }
+      redisServer = new RedisServer(port);
+      redisServer.start();
+    }
+
+    public int getPort() {
+      return this.port;
+    }
+
+    @Override
+    public void close() {
+      redisServer.stop();
+    }
+
+  }
+
+}


[2/2] beam git commit: [BEAM-1017] This closes #1687

Posted by jb...@apache.org.
[BEAM-1017] This closes #1687


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9524cbd8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9524cbd8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9524cbd8

Branch: refs/heads/master
Commit: 9524cbd88bba33d7fe01691c791f19fbe6239464
Parents: 4b502bf 0d8ab6c
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Oct 17 07:38:06 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Oct 17 07:38:06 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/redis/pom.xml                      |  90 ++++
 .../io/redis/RedisConnectionConfiguration.java  | 122 +++++
 .../org/apache/beam/sdk/io/redis/RedisIO.java   | 451 +++++++++++++++++++
 .../apache/beam/sdk/io/redis/package-info.java  |  22 +
 .../apache/beam/sdk/io/redis/RedisIOTest.java   | 109 +++++
 6 files changed, 795 insertions(+)
----------------------------------------------------------------------