You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:09:03 UTC

[flink-statefun] 12/17: [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b3f6b844e9f9b568b7c94f68b7cfdf758e8e66dc
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:32:07 2020 +0200

    [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
---
 .../flink/statefun/flink/core/logger/Loggers.java     | 19 +++++++++++++++++--
 .../flink/core/logger/UnboundedFeedbackLogger.java    | 11 ++++-------
 .../core/logger/UnboundedFeedbackLoggerTest.java      |  2 +-
 3 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 62720bf..948a808 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -43,10 +43,25 @@ public final class Loggers {
       TypeSerializer<?> serializer,
       Function<?, ?> keySelector) {
 
+    UnboundedFeedbackLoggerFactory<?> factory =
+        unboundedSpillableLoggerFactory(
+            ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
+
+    return factory.create();
+  }
+
+  public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
+      IOManager ioManager,
+      int maxParallelism,
+      long inMemoryMaxBufferSize,
+      TypeSerializer<?> serializer,
+      Function<?, ?> keySelector) {
+
     ObjectContainer container =
         unboundedSpillableLoggerContainer(
             ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
-    return container.get(UnboundedFeedbackLogger.class);
+
+    return container.get(UnboundedFeedbackLoggerFactory.class);
   }
 
   /** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */
@@ -70,7 +85,7 @@ public final class Loggers {
         "checkpoint-stream-ops",
         CheckpointedStreamOperations.class,
         KeyedStateCheckpointOutputStreamOps.INSTANCE);
-    container.add(UnboundedFeedbackLogger.class);
+    container.add(UnboundedFeedbackLoggerFactory.class);
     return container;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index 8ddc22e..ef0360a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.statefun.flink.core.di.Inject;
-import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
@@ -50,12 +48,11 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
   private TypeSerializer<T> serializer;
   private Closeable snapshotLease;
 
-  @Inject
   public UnboundedFeedbackLogger(
-      @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
-      @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
-      @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
-      @Label("envelope-serializer") TypeSerializer<T> serializer) {
+      Supplier<KeyGroupStream<T>> supplier,
+      ToIntFunction<T> keyGroupAssigner,
+      CheckpointedStreamOperations ops,
+      TypeSerializer<T> serializer) {
     this.supplier = Objects.requireNonNull(supplier);
     this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
     this.serializer = Objects.requireNonNull(serializer);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 8b66b21..75c4c55 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -115,7 +115,7 @@ public class UnboundedFeedbackLoggerTest {
             IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity());
 
     container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE);
-    return container.get(UnboundedFeedbackLogger.class);
+    return container.get(UnboundedFeedbackLoggerFactory.class).create();
   }
 
   enum NOOP implements CheckpointedStreamOperations {