You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/10/17 05:57:30 UTC
[1/2] beam git commit: [BEAM-1017] Add RedisIO
Repository: beam
Updated Branches:
refs/heads/master 4b502bf71 -> 9524cbd88
[BEAM-1017] Add RedisIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0d8ab6cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0d8ab6cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0d8ab6cb
Branch: refs/heads/master
Commit: 0d8ab6cbbc762dd9f9be1b3e9a26b6c9d0bb6dc3
Parents: 4b502bf
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Mar 22 19:03:00 2017 +0100
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Oct 17 07:37:14 2017 +0200
----------------------------------------------------------------------
sdks/java/io/pom.xml | 1 +
sdks/java/io/redis/pom.xml | 90 ++++
.../io/redis/RedisConnectionConfiguration.java | 122 +++++
.../org/apache/beam/sdk/io/redis/RedisIO.java | 451 +++++++++++++++++++
.../apache/beam/sdk/io/redis/package-info.java | 22 +
.../apache/beam/sdk/io/redis/RedisIOTest.java | 109 +++++
6 files changed, 795 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 49eb796..99936a2 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -57,6 +57,7 @@
<module>kinesis</module>
<module>mongodb</module>
<module>mqtt</module>
+ <module>redis</module>
<module>solr</module>
<module>tika</module>
<module>xml</module>
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/pom.xml b/sdks/java/io/redis/pom.xml
new file mode 100644
index 0000000..d89e627
--- /dev/null
+++ b/sdks/java/io/redis/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-redis</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Redis</name>
+ <description>IO to read and write on a Redis keystore.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>2.9.0</version>
+ </dependency>
+
+ <!-- compile dependency -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
new file mode 100644
index 0000000..efcc77b
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisConnectionConfiguration.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
+
+/**
+ * {@code RedisConnectionConfiguration} describes and wraps a connectionConfiguration to Redis
+ * server or cluster.
+ */
+@AutoValue
+public abstract class RedisConnectionConfiguration implements Serializable {
+
+ abstract String host();
+ abstract int port();
+ @Nullable abstract String auth();
+ abstract int timeout();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setHost(String host);
+ abstract Builder setPort(int port);
+ abstract Builder setAuth(String auth);
+ abstract Builder setTimeout(int timeout);
+ abstract RedisConnectionConfiguration build();
+ }
+
+ public static RedisConnectionConfiguration create() {
+ return new AutoValue_RedisConnectionConfiguration.Builder()
+ .setHost(Protocol.DEFAULT_HOST)
+ .setPort(Protocol.DEFAULT_PORT)
+ .setTimeout(Protocol.DEFAULT_TIMEOUT).build();
+ }
+
+ public static RedisConnectionConfiguration create(String host, int port) {
+ return new AutoValue_RedisConnectionConfiguration.Builder()
+ .setHost(host)
+ .setPort(port)
+ .setTimeout(Protocol.DEFAULT_TIMEOUT).build();
+ }
+
+ /**
+ * Define the host name of the Redis server.
+ */
+ public RedisConnectionConfiguration withHost(String host) {
+ checkArgument(host != null, "host can not be null");
+ return builder().setHost(host).build();
+ }
+
+ /**
+ * Define the port number of the Redis server.
+ */
+ public RedisConnectionConfiguration withPort(int port) {
+ checkArgument(port > 0, "port can not be negative or 0");
+ return builder().setPort(port).build();
+ }
+
+ /**
+ * Define the password to authenticate on the Redis server.
+ */
+ public RedisConnectionConfiguration withAuth(String auth) {
+ checkArgument(auth != null, "auth can not be null");
+ return builder().setAuth(auth).build();
+ }
+
+ /**
+ * Define the Redis connection timeout. A timeout of zero is interpreted as an infinite timeout.
+ */
+ public RedisConnectionConfiguration withTimeout(int timeout) {
+ checkArgument(timeout >= 0, "timeout can not be negative");
+ return builder().setTimeout(timeout).build();
+ }
+
+ /**
+ * Connect to the Redis instance.
+ */
+ public Jedis connect() {
+ Jedis jedis = new Jedis(host(), port(), timeout());
+ if (auth() != null) {
+ jedis.auth(auth());
+ }
+ return jedis;
+ }
+
+ /**
+ * Populate the display data with connectionConfiguration details.
+ */
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("host", host()));
+ builder.add(DisplayData.item("port", port()));
+ builder.addIfNotNull(DisplayData.item("timeout", timeout()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
new file mode 100644
index 0000000..bfbad13
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+/**
+ * An IO to manipulate Redis key/value database.
+ *
+ * <h3>Reading Redis key/value pairs</h3>
+ *
+ * <p>{@link #read()} provides a source which returns a bounded {@link PCollection} containing
+ * key/value pairs as {@code KV<String, String>}.
+ *
+ * <p>To configure a Redis source, you have to provide Redis server hostname and port number.
+ * Optionally, you can provide a key pattern (to filter the keys). The following example
+ * illustrates how to configure a source:
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(RedisIO.read()
+ * .withEndpoint("::1", 6379)
+ * .withKeyPattern("foo*"))
+ *
+ * }</pre>
+ *
+ * <p>It's also possible to specify Redis authentication and connection timeout with the
+ * corresponding methods:
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(RedisIO.read()
+ * .withEndpoint("::1", 6379)
+ * .withAuth("authPassword")
+ * .withTimeout(60000)
+ * .withKeyPattern("foo*"))
+ *
+ * }</pre>
+ *
+ * <p>{@link #readAll()} can be used to request Redis server using input PCollection elements as key
+ * pattern (as String).
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(...)
+ * // here we have a PCollection<String> with the key patterns
+ * .apply(RedisIO.readAll().withEndpoint("::1", 6379))
+ * // here we have a PCollection<KV<String,String>>
+ *
+ * }</pre>
+ *
+ * <h3>Writing Redis key/value pairs</h3>
+ *
+ * <p>{@link #write()} provides a sink to write key/value pairs represented as
+ * {@link KV} from an incoming {@link PCollection}.
+ *
+ * <p>To configure the target Redis server, you have to provide Redis server hostname and port
+ * number. The following example illustrates how to configure a sink:
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(...)
+ * // here we a have a PCollection<String, String> with key/value pairs
+ * .apply(RedisIO.write().withEndpoint("::1", 6379))
+ *
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class RedisIO {
+
+ /**
+ * Read data from a Redis server.
+ */
+ public static Read read() {
+ return new AutoValue_RedisIO_Read.Builder()
+ .setConnectionConfiguration(RedisConnectionConfiguration.create())
+ .setKeyPattern("*").build();
+ }
+
+ /**
+ * Like {@link #read()} but executes multiple instances of the Redis query substituting each
+ * element of a {@link PCollection} as key pattern.
+ */
+ public static ReadAll readAll() {
+ return new AutoValue_RedisIO_ReadAll.Builder()
+ .setConnectionConfiguration(RedisConnectionConfiguration.create())
+ .build();
+ }
+
+ /**
+ * Write data to a Redis server.
+ */
+ public static Write write() {
+ return new AutoValue_RedisIO_Write.Builder()
+ .setConnectionConfiguration(RedisConnectionConfiguration.create())
+ .build();
+ }
+
+ private RedisIO() {
+ }
+
+ /**
+ * Implementation of {@link #read()}.
+ */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<KV<String, String>>> {
+
+ @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+ @Nullable abstract String keyPattern();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ @Nullable abstract Builder setConnectionConfiguration(
+ RedisConnectionConfiguration connection);
+ @Nullable abstract Builder setKeyPattern(String keyPattern);
+ abstract Read build();
+ }
+
+ public Read withEndpoint(String host, int port) {
+ checkArgument(host != null, "host can not be null");
+ checkArgument(port > 0, "port can not be negative or 0");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withHost(host))
+ .setConnectionConfiguration(connectionConfiguration().withPort(port))
+ .build();
+ }
+
+ public Read withAuth(String auth) {
+ checkArgument(auth != null, "auth can not be null");
+ return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+ }
+
+ public Read withTimeout(int timeout) {
+ checkArgument(timeout >= 0, "timeout can not be negative");
+ return builder().setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+ .build();
+ }
+
+ public Read withKeyPattern(String keyPattern) {
+ checkArgument(keyPattern != null, "keyPattern can not be null");
+ return builder().setKeyPattern(keyPattern).build();
+ }
+
+ public Read withConnectionConfiguration(RedisConnectionConfiguration connection) {
+ checkArgument(connection != null, "connection can not be null");
+ return builder().setConnectionConfiguration(connection).build();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ connectionConfiguration().populateDisplayData(builder);
+ }
+
+ @Override
+ public PCollection<KV<String, String>> expand(PBegin input) {
+ checkArgument(connectionConfiguration() != null,
+ "withConnectionConfiguration() is required");
+
+ return input
+ .apply(Create.of(keyPattern()))
+ .apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
+ }
+
+ }
+
+ /**
+ * Implementation of {@link #readAll()}.
+ */
+ @AutoValue
+ public abstract static class ReadAll
+ extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
+
+ @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+
+ abstract ReadAll.Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ @Nullable abstract ReadAll.Builder setConnectionConfiguration(
+ RedisConnectionConfiguration connection);
+ abstract ReadAll build();
+ }
+
+ public ReadAll withEndpoint(String host, int port) {
+ checkArgument(host != null, "host can not be null");
+ checkArgument(port > 0, "port can not be negative or 0");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withHost(host))
+ .setConnectionConfiguration(connectionConfiguration().withPort(port))
+ .build();
+ }
+
+ public ReadAll withAuth(String auth) {
+ checkArgument(auth != null, "auth can not be null");
+ return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+ }
+
+ public ReadAll withTimeout(int timeout) {
+ checkArgument(timeout >= 0, "timeout can not be negative");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout)).build();
+ }
+
+ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
+ checkArgument(connection != null, "connection can not be null");
+ return builder().setConnectionConfiguration(connection).build();
+ }
+
+ @Override
+ public PCollection<KV<String, String>> expand(PCollection<String> input) {
+ checkArgument(connectionConfiguration() != null,
+ "withConnectionConfiguration() is required");
+
+ return input.apply(ParDo.of(new ReadFn(connectionConfiguration())))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .apply(new Reparallelize());
+ }
+
+ }
+
+ /**
+ * A {@link DoFn} requesting Redis server to get key/value pairs.
+ */
+ private static class ReadFn extends DoFn<String, KV<String, String>> {
+
+ private final RedisConnectionConfiguration connectionConfiguration;
+
+ private transient Jedis jedis;
+
+ public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+ this.connectionConfiguration = connectionConfiguration;
+ }
+
+ @Setup
+ public void setup() {
+ jedis = connectionConfiguration.connect();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws Exception {
+ ScanParams scanParams = new ScanParams();
+ scanParams.match(processContext.element());
+
+ String cursor = ScanParams.SCAN_POINTER_START;
+ boolean finished = false;
+ while (!finished) {
+ ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+ List<String> keys = scanResult.getResult();
+
+ Pipeline pipeline = jedis.pipelined();
+ if (keys != null) {
+ for (String key : keys) {
+ pipeline.get(key);
+ }
+ List<Object> values = pipeline.syncAndReturnAll();
+ for (int i = 0; i < values.size(); i++) {
+ processContext.output(KV.of(keys.get(i), (String) values.get(i)));
+ }
+ }
+
+ cursor = scanResult.getStringCursor();
+ if (cursor.equals("0")) {
+ finished = true;
+ }
+ }
+ }
+
+ @Teardown
+ public void teardown() {
+ jedis.close();
+ }
+
+ }
+
+ private static class Reparallelize
+ extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+ @Override public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+ // reparallelize mimics the same behavior as in JdbcIO
+ // breaking fusion
+ PCollectionView<Iterable<KV<String, String>>> empty = input
+ .apply("Consume",
+ Filter.by(SerializableFunctions.<KV<String, String>, Boolean>constant(false)))
+ .apply(View.<KV<String, String>>asIterable());
+ PCollection<KV<String, String>> materialized = input
+ .apply("Identity", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(context.element());
+ }
+ }).withSideInputs(empty));
+ return materialized.apply(Reshuffle.<KV<String, String>>viaRandomKey());
+ }
+ }
+
+ /**
+ * A {@link PTransform} to write to a Redis server.
+ */
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<KV<String, String>>, PDone> {
+
+ @Nullable abstract RedisConnectionConfiguration connectionConfiguration();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setConnectionConfiguration(
+ RedisConnectionConfiguration connectionConfiguration);
+
+ abstract Write build();
+
+ }
+
+ public Write withEndpoint(String host, int port) {
+ checkArgument(host != null, "host can not be null");
+ checkArgument(port > 0, "port can not be negative or 0");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withHost(host))
+ .setConnectionConfiguration(connectionConfiguration().withPort(port))
+ .build();
+ }
+
+ public Write withAuth(String auth) {
+ checkArgument(auth != null, "auth can not be null");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+ .build();
+ }
+
+ public Write withTimeout(int timeout) {
+ checkArgument(timeout >= 0, "timeout can not be negative");
+ return builder()
+ .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+ .build();
+ }
+
+ public Write withConnectionConfiguration(RedisConnectionConfiguration connection) {
+ checkArgument(connection != null, "connection can not be null");
+ return builder().setConnectionConfiguration(connection).build();
+ }
+
+ @Override
+ public PDone expand(PCollection<KV<String, String>> input) {
+ checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+ input.apply(ParDo.of(new WriteFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ private static class WriteFn extends DoFn<KV<String, String>, Void> {
+
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
+ private final Write spec;
+
+ private transient Jedis jedis;
+ private transient Pipeline pipeline;
+
+ private int batchCount;
+
+ public WriteFn(Write spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() {
+ jedis = spec.connectionConfiguration().connect();
+ }
+
+ @StartBundle
+ public void startBundle() {
+ pipeline = jedis.pipelined();
+ pipeline.multi();
+ batchCount = 0;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) {
+ KV<String, String> record = processContext.element();
+ pipeline.append(record.getKey(), record.getValue());
+
+ batchCount++;
+
+ if (batchCount >= DEFAULT_BATCH_SIZE) {
+ pipeline.exec();
+ batchCount = 0;
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle() {
+ pipeline.exec();
+ batchCount = 0;
+ }
+
+ @Teardown
+ public void teardown() {
+ jedis.close();
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
new file mode 100644
index 0000000..a650acc
--- /dev/null
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Redis.
+ */
+package org.apache.beam.sdk.io.redis;
http://git-wip-us.apache.org/repos/asf/beam/blob/0d8ab6cb/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
new file mode 100644
index 0000000..b5ba847
--- /dev/null
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.redis;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import redis.embedded.RedisServer;
+
+/**
+ * Test on the Redis IO.
+ */
+public class RedisIOTest {
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ private EmbeddedRedis embeddedRedis;
+
+ @Before
+ public void before() throws Exception {
+ embeddedRedis = new EmbeddedRedis();
+ }
+
+ @After
+ public void after() throws Exception {
+ embeddedRedis.close();
+ }
+
+ @Test
+ public void testWriteRead() throws Exception {
+ ArrayList<KV<String, String>> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ KV<String, String> kv = KV.of("key " + i, "value " + i);
+ data.add(kv);
+ }
+ PCollection<KV<String, String>> write = writePipeline.apply(Create.of(data));
+ write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
+
+ writePipeline.run();
+
+ PCollection<KV<String, String>> read = readPipeline.apply("Read",
+ RedisIO.read().withEndpoint("::1", embeddedRedis.getPort())
+ .withKeyPattern("key*"));
+ PAssert.that(read).containsInAnyOrder(data);
+
+ PCollection<KV<String, String>> readNotMatch = readPipeline.apply("ReadNotMatch",
+ RedisIO.read().withEndpoint("::1", embeddedRedis.getPort())
+ .withKeyPattern("foobar*"));
+ PAssert.thatSingleton(readNotMatch.apply(Count.<KV<String, String>>globally())).isEqualTo(0L);
+
+ readPipeline.run();
+ }
+
+ /**
+ * Simple embedded Redis instance wrapper to control Redis server.
+ */
+ private static class EmbeddedRedis implements AutoCloseable {
+
+ private final int port;
+ private final RedisServer redisServer;
+
+ public EmbeddedRedis() throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ port = serverSocket.getLocalPort();
+ }
+ redisServer = new RedisServer(port);
+ redisServer.start();
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ @Override
+ public void close() {
+ redisServer.stop();
+ }
+
+ }
+
+}
[2/2] beam git commit: [BEAM-1017] This closes #1687
Posted by jb...@apache.org.
[BEAM-1017] This closes #1687
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9524cbd8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9524cbd8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9524cbd8
Branch: refs/heads/master
Commit: 9524cbd88bba33d7fe01691c791f19fbe6239464
Parents: 4b502bf 0d8ab6c
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Oct 17 07:38:06 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Oct 17 07:38:06 2017 +0200
----------------------------------------------------------------------
sdks/java/io/pom.xml | 1 +
sdks/java/io/redis/pom.xml | 90 ++++
.../io/redis/RedisConnectionConfiguration.java | 122 +++++
.../org/apache/beam/sdk/io/redis/RedisIO.java | 451 +++++++++++++++++++
.../apache/beam/sdk/io/redis/package-info.java | 22 +
.../apache/beam/sdk/io/redis/RedisIOTest.java | 109 +++++
6 files changed, 795 insertions(+)
----------------------------------------------------------------------