You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:09:03 UTC
[flink-statefun] 12/17: [FLINK-17533] Expose the
UnboundedFeedbackLoggerFactory
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b3f6b844e9f9b568b7c94f68b7cfdf758e8e66dc
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 21:32:07 2020 +0200
[FLINK-17533] Expose the UnboundedFeedbackLoggerFactory
---
.../flink/statefun/flink/core/logger/Loggers.java | 19 +++++++++++++++++--
.../flink/core/logger/UnboundedFeedbackLogger.java | 11 ++++-------
.../core/logger/UnboundedFeedbackLoggerTest.java | 2 +-
3 files changed, 22 insertions(+), 10 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index 62720bf..948a808 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -43,10 +43,25 @@ public final class Loggers {
TypeSerializer<?> serializer,
Function<?, ?> keySelector) {
+ UnboundedFeedbackLoggerFactory<?> factory =
+ unboundedSpillableLoggerFactory(
+ ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
+
+ return factory.create();
+ }
+
+ public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(
+ IOManager ioManager,
+ int maxParallelism,
+ long inMemoryMaxBufferSize,
+ TypeSerializer<?> serializer,
+ Function<?, ?> keySelector) {
+
ObjectContainer container =
unboundedSpillableLoggerContainer(
ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector);
- return container.get(UnboundedFeedbackLogger.class);
+
+ return container.get(UnboundedFeedbackLoggerFactory.class);
}
/** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */
@@ -70,7 +85,7 @@ public final class Loggers {
"checkpoint-stream-ops",
CheckpointedStreamOperations.class,
KeyedStateCheckpointOutputStreamOps.INSTANCE);
- container.add(UnboundedFeedbackLogger.class);
+ container.add(UnboundedFeedbackLoggerFactory.class);
return container;
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index 8ddc22e..ef0360a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.statefun.flink.core.di.Inject;
-import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.util.IOUtils;
@@ -50,12 +48,11 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
private TypeSerializer<T> serializer;
private Closeable snapshotLease;
- @Inject
public UnboundedFeedbackLogger(
- @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier,
- @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner,
- @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops,
- @Label("envelope-serializer") TypeSerializer<T> serializer) {
+ Supplier<KeyGroupStream<T>> supplier,
+ ToIntFunction<T> keyGroupAssigner,
+ CheckpointedStreamOperations ops,
+ TypeSerializer<T> serializer) {
this.supplier = Objects.requireNonNull(supplier);
this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner);
this.serializer = Objects.requireNonNull(serializer);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index 8b66b21..75c4c55 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -115,7 +115,7 @@ public class UnboundedFeedbackLoggerTest {
IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity());
container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE);
- return container.get(UnboundedFeedbackLogger.class);
+ return container.get(UnboundedFeedbackLoggerFactory.class).create();
}
enum NOOP implements CheckpointedStreamOperations {