You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/10 03:43:31 UTC

[flink-statefun] 02/09: [FLINK-15954] Add PersistedTable to bound state

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 45f4d17a6d955470b28aad48d30aa8178c38a953
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 7 17:23:30 2020 +0100

    [FLINK-15954] Add PersistedTable to bound state
    
    This commit adds the PersistedTable and also refactoros this class
    to use a builder.
---
 .../statefun/flink/core/state/BoundState.java      | 44 ++++++++++++++++++++--
 1 file changed, 40 insertions(+), 4 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
index 6a99de2..f7b131a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
@@ -19,18 +19,54 @@ package org.apache.flink.statefun.flink.core.state;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
 public class BoundState {
 
-  private final List<PersistedValue<Object>> persistedValues;
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  private final List<PersistedValue<?>> persistedValues;
+  private final List<PersistedTable<?, ?>> persistedTables;
 
-  BoundState(List<PersistedValue<Object>> persistedValues) {
-    this.persistedValues = new ArrayList<>(persistedValues);
+  private BoundState(
+      List<PersistedValue<?>> persistedValues, List<PersistedTable<?, ?>> persistedTables) {
+    this.persistedValues = Objects.requireNonNull(persistedValues);
+    this.persistedTables = Objects.requireNonNull(persistedTables);
   }
 
   @SuppressWarnings("unused")
-  public List<PersistedValue<Object>> persistedValues() {
+  public List<PersistedValue<?>> persistedValues() {
     return persistedValues;
   }
+
+  @SuppressWarnings("unused")
+  public List<PersistedTable<?, ?>> getPersistedTables() {
+    return persistedTables;
+  }
+
+  @SuppressWarnings("UnusedReturnValue")
+  public static final class Builder {
+    private List<PersistedValue<?>> persistedValues = new ArrayList<>();
+    private List<PersistedTable<?, ?>> persistedTables = new ArrayList<>();
+
+    private Builder() {}
+
+    public Builder withPersistedValue(PersistedValue<?> persistedValue) {
+      this.persistedValues.add(persistedValue);
+      return this;
+    }
+
+    public Builder withPersistedTable(PersistedTable<?, ?> persistedTable) {
+      this.persistedTables.add(persistedTable);
+      return this;
+    }
+
+    public BoundState build() {
+      return new BoundState(persistedValues, persistedTables);
+    }
+  }
 }