You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/10 03:43:31 UTC
[flink-statefun] 02/09: [FLINK-15954] Add PersistedTable to bound
state
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 45f4d17a6d955470b28aad48d30aa8178c38a953
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 7 17:23:30 2020 +0100
[FLINK-15954] Add PersistedTable to bound state
This commit adds the PersistedTable and also refactoros this class
to use a builder.
---
.../statefun/flink/core/state/BoundState.java | 44 ++++++++++++++++++++--
1 file changed, 40 insertions(+), 4 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
index 6a99de2..f7b131a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
@@ -19,18 +19,54 @@ package org.apache.flink.statefun.flink.core.state;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
public class BoundState {
- private final List<PersistedValue<Object>> persistedValues;
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ private final List<PersistedValue<?>> persistedValues;
+ private final List<PersistedTable<?, ?>> persistedTables;
- BoundState(List<PersistedValue<Object>> persistedValues) {
- this.persistedValues = new ArrayList<>(persistedValues);
+ private BoundState(
+ List<PersistedValue<?>> persistedValues, List<PersistedTable<?, ?>> persistedTables) {
+ this.persistedValues = Objects.requireNonNull(persistedValues);
+ this.persistedTables = Objects.requireNonNull(persistedTables);
}
@SuppressWarnings("unused")
- public List<PersistedValue<Object>> persistedValues() {
+ public List<PersistedValue<?>> persistedValues() {
return persistedValues;
}
+
+ @SuppressWarnings("unused")
+ public List<PersistedTable<?, ?>> getPersistedTables() {
+ return persistedTables;
+ }
+
+ @SuppressWarnings("UnusedReturnValue")
+ public static final class Builder {
+ private List<PersistedValue<?>> persistedValues = new ArrayList<>();
+ private List<PersistedTable<?, ?>> persistedTables = new ArrayList<>();
+
+ private Builder() {}
+
+ public Builder withPersistedValue(PersistedValue<?> persistedValue) {
+ this.persistedValues.add(persistedValue);
+ return this;
+ }
+
+ public Builder withPersistedTable(PersistedTable<?, ?> persistedTable) {
+ this.persistedTables.add(persistedTable);
+ return this;
+ }
+
+ public BoundState build() {
+ return new BoundState(persistedValues, persistedTables);
+ }
+ }
}