You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2020/06/20 20:05:40 UTC
[beam] 01/03: Deprecate RedisIO.readAll transform and add
RedisIO.readKeys as a replacement
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5ae8173528c5594a1be9f9a6263e150683d6de02
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Apr 13 17:28:08 2020 +0200
Deprecate RedisIO.readAll transform and add RedisIO.readKeys as a replacement
---
.../java/org/apache/beam/sdk/io/redis/RedisIO.java | 46 +++++++++++++++-------
1 file changed, 31 insertions(+), 15 deletions(-)
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 313f6f2..77fbb6a 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -80,13 +80,13 @@ import redis.clients.jedis.ScanResult;
*
* }</pre>
*
- * <p>{@link #readAll()} can be used to request Redis server using input PCollection elements as key
- * pattern (as String).
+ * <p>{@link #readKeyPatterns()} can be used to request Redis server using input PCollection
+ * elements as key pattern (as String).
*
* <pre>{@code
* pipeline.apply(...)
* // here we have a PCollection<String> with the key patterns
- * .apply(RedisIO.readAll().withEndpoint("::1", 6379))
+ * .apply(RedisIO.readKeyPatterns().withEndpoint("::1", 6379))
* // here we have a PCollection<KV<String,String>>
*
* }</pre>
@@ -123,8 +123,24 @@ public class RedisIO {
* Like {@link #read()} but executes multiple instances of the Redis query substituting each
* element of a {@link PCollection} as key pattern.
*/
- public static ReadAll readAll() {
- return new AutoValue_RedisIO_ReadAll.Builder()
+ public static ReadKeyPatterns readKeyPatterns() {
+ return new AutoValue_RedisIO_ReadKeyPatterns.Builder()
+ .setConnectionConfiguration(RedisConnectionConfiguration.create())
+ .setBatchSize(1000)
+ .setOutputParallelization(true)
+ .build();
+ }
+
+ /**
+ * Like {@link #read()} but executes multiple instances of the Redis query substituting each
+ * element of a {@link PCollection} as key pattern.
+ *
+ * @deprecated This method is not consistent with the readAll pattern of other transforms and will
+ * be remove soon. Please update you code to use {@link #readKeyPatterns()} instead.
+ */
+ @Deprecated
+ public static ReadKeyPatterns readAll() {
+ return new AutoValue_RedisIO_ReadKeyPatterns.Builder()
.setConnectionConfiguration(RedisConnectionConfiguration.create())
.setBatchSize(1000)
.setOutputParallelization(true)
@@ -229,16 +245,16 @@ public class RedisIO {
.apply(Create.of(keyPattern()))
.apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration())))
.apply(
- RedisIO.readAll()
+ RedisIO.readKeyPatterns()
.withConnectionConfiguration(connectionConfiguration())
.withBatchSize(batchSize())
.withOutputParallelization(outputParallelization()));
}
}
- /** Implementation of {@link #readAll()}. */
+ /** Implementation of {@link #readKeyPatterns()}. */
@AutoValue
- public abstract static class ReadAll
+ public abstract static class ReadKeyPatterns
extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
@Nullable
@@ -259,10 +275,10 @@ public class RedisIO {
abstract Builder setOutputParallelization(boolean outputParallelization);
- abstract ReadAll build();
+ abstract ReadKeyPatterns build();
}
- public ReadAll withEndpoint(String host, int port) {
+ public ReadKeyPatterns withEndpoint(String host, int port) {
checkArgument(host != null, "host can not be null");
checkArgument(port > 0, "port can not be negative or 0");
return toBuilder()
@@ -270,26 +286,26 @@ public class RedisIO {
.build();
}
- public ReadAll withAuth(String auth) {
+ public ReadKeyPatterns withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withAuth(auth))
.build();
}
- public ReadAll withTimeout(int timeout) {
+ public ReadKeyPatterns withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
.build();
}
- public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
+ public ReadKeyPatterns withConnectionConfiguration(RedisConnectionConfiguration connection) {
checkArgument(connection != null, "connection can not be null");
return toBuilder().setConnectionConfiguration(connection).build();
}
- public ReadAll withBatchSize(int batchSize) {
+ public ReadKeyPatterns withBatchSize(int batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}
@@ -297,7 +313,7 @@ public class RedisIO {
* Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
* default is to parallelize and should only be changed if this is known to be unnecessary.
*/
- public ReadAll withOutputParallelization(boolean outputParallelization) {
+ public ReadKeyPatterns withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}