You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2020/06/20 20:05:42 UTC
[beam] 03/03: --wip-- [skip ci]
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 98639d1685332700db7f163e614d63f02c955beb
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Wed Apr 15 19:49:15 2020 +0200
--wip-- [skip ci]
---
.../java/org/apache/beam/sdk/io/redis/RedisIO.java | 85 +++++++++++++++++++++-
1 file changed, 84 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 2757f2e..0c90ad8 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -131,6 +131,11 @@ public class RedisIO {
.build();
}
+ /** Like {@link #read()} but executes multiple reads from a a {@link PCollection} as Reads. */
+ public static ReadAll readAll() {
+ return new ReadAll();
+ }
+
/** Write data to a Redis server. */
public static Write write() {
return new AutoValue_RedisIO_Write.Builder()
@@ -315,12 +320,90 @@ public class RedisIO {
}
}
+ /** Implementation of {@link #readKeyPatterns()}. */
+ public abstract static class ReadAll
+ extends PTransform<PCollection<Read>, PCollection<KV<String, String>>> {
+ @Override
+ public PCollection<KV<String, String>> expand(PCollection<Read> input) {
+ return input
+ // .apply("Split", ParDo.of(new SplitFn()))
+ .apply("Reshuffle", Reshuffle.viaRandomKey())
+ .apply("Read", ParDo.of(new ReadFn()));
+ }
+ }
+
+ private static class ReadFn extends DoFn<Read, KV<String, String>> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ Read spec = c.element();
+ Jedis jedis = spec.connectionConfiguration().connect();
+
+ Multimap<BoundedWindow, String> bundles = ArrayListMultimap.create();
+ AtomicInteger batchCount = new AtomicInteger();
+
+ ScanParams scanParams = new ScanParams();
+ scanParams.match(spec.keyPattern());
+ String cursor = ScanParams.SCAN_POINTER_START;
+ boolean finished = false;
+ while (!finished) {
+ ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+ List<String> keys = scanResult.getResult();
+ for (String key : keys) {
+// c.output(k);
+// String key = c.element();
+
+ bundles.put(window, key);
+ if (batchCount.incrementAndGet() > spec.batchSize()) {
+ Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+ for (BoundedWindow w : kvs.keySet()) {
+ for (KV<String, String> kv : kvs.get(w)) {
+ c.output(kv);
+ }
+ }
+ }
+
+ }
+ cursor = scanResult.getCursor();
+ if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
+ finished = true;
+ }
+ }
+
+ bundles.put(window, key);
+ if (batchCount.incrementAndGet() > getBatchSize()) {
+ Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+ for (BoundedWindow w : kvs.keySet()) {
+ for (KV<String, String> kv : kvs.get(w)) {
+ c.output(kv);
+ }
+ }
+ }
+ }
+
+ private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
+ Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
+ for (BoundedWindow w : bundles.keySet()) {
+ String[] keys = new String[bundles.get(w).size()];
+ bundles.get(w).toArray(keys);
+ List<String> results = jedis.mget(keys);
+ for (int i = 0; i < results.size(); i++) {
+ if (results.get(i) != null) {
+ kvs.put(w, KV.of(keys[i], results.get(i)));
+ }
+ }
+ }
+ bundles = ArrayListMultimap.create();
+ batchCount.set(0);
+ return kvs;
+ }
+ }
+
private abstract static class BaseReadFn<T> extends DoFn<String, T> {
protected final RedisConnectionConfiguration connectionConfiguration;
transient Jedis jedis;
- BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
+ BaseReadFn() {
this.connectionConfiguration = connectionConfiguration;
}