You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/19 12:35:51 UTC

[flink-statefun] 03/04: [FLINK-16106] [core] Let StateBinder handle PersistedAppendingBuffer

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 20e8b5cd34484b35ec52cc441a63073c1dce399e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 17 14:18:24 2020 +0800

    [FLINK-16106] [core] Let StateBinder handle PersistedAppendingBuffer
    
    This closes #25.
---
 .../flink/statefun/flink/core/state/BoundState.java   | 19 +++++++++++++++++--
 .../statefun/flink/core/state/PersistedStates.java    |  5 ++++-
 .../flink/statefun/flink/core/state/StateBinder.java  | 16 ++++++++++++++++
 .../apache/flink/statefun/sdk/state/ApiExtension.java |  5 +++++
 .../statefun/flink/core/state/StateBinderTest.java    | 13 +++++++++++++
 5 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
index f7b131a..d86cab1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
@@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.state;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -31,11 +32,15 @@ public class BoundState {
 
   private final List<PersistedValue<?>> persistedValues;
   private final List<PersistedTable<?, ?>> persistedTables;
+  private final List<PersistedAppendingBuffer<?>> persistedAppendingBuffers;
 
   private BoundState(
-      List<PersistedValue<?>> persistedValues, List<PersistedTable<?, ?>> persistedTables) {
+      List<PersistedValue<?>> persistedValues,
+      List<PersistedTable<?, ?>> persistedTables,
+      List<PersistedAppendingBuffer<?>> persistedAppendingBuffers) {
     this.persistedValues = Objects.requireNonNull(persistedValues);
     this.persistedTables = Objects.requireNonNull(persistedTables);
+    this.persistedAppendingBuffers = Objects.requireNonNull(persistedAppendingBuffers);
   }
 
   @SuppressWarnings("unused")
@@ -48,10 +53,15 @@ public class BoundState {
     return persistedTables;
   }
 
+  public List<PersistedAppendingBuffer<?>> getPersistedAppendingBuffers() {
+    return persistedAppendingBuffers;
+  }
+
   @SuppressWarnings("UnusedReturnValue")
   public static final class Builder {
     private List<PersistedValue<?>> persistedValues = new ArrayList<>();
     private List<PersistedTable<?, ?>> persistedTables = new ArrayList<>();
+    private List<PersistedAppendingBuffer<?>> persistedAppendingBuffers = new ArrayList<>();
 
     private Builder() {}
 
@@ -65,8 +75,13 @@ public class BoundState {
       return this;
     }
 
+    public Builder withPersistedList(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
+      this.persistedAppendingBuffers.add(persistedAppendingBuffer);
+      return this;
+    }
+
     public BoundState build() {
-      return new BoundState(persistedValues, persistedTables);
+      return new BoundState(persistedValues, persistedTables, persistedAppendingBuffers);
     }
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
index f787986..c890e56 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -76,7 +77,9 @@ final class PersistedStates {
   }
 
   private static boolean isPersistedState(Class<?> fieldType) {
-    return fieldType == PersistedValue.class || fieldType == PersistedTable.class;
+    return fieldType == PersistedValue.class
+        || fieldType == PersistedTable.class
+        || fieldType == PersistedAppendingBuffer.class;
   }
 
   private static Object getPersistedValueReflectively(Object instance, Field persistedField) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
index a7b0c4c..4415068 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
@@ -26,6 +26,8 @@ import org.apache.flink.statefun.flink.core.state.BoundState.Builder;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.state.Accessor;
 import org.apache.flink.statefun.sdk.state.ApiExtension;
+import org.apache.flink.statefun.sdk.state.AppendingBufferAccessor;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 import org.apache.flink.statefun.sdk.state.TableAccessor;
@@ -53,6 +55,13 @@ public final class StateBinder {
             state.createFlinkStateTableAccessor(functionType, persistedTable);
         setAccessorRaw(persistedTable, accessor);
         builder.withPersistedTable(persistedTable);
+      } else if (persisted instanceof PersistedAppendingBuffer) {
+        PersistedAppendingBuffer<?> persistedAppendingBuffer =
+            (PersistedAppendingBuffer<?>) persisted;
+        AppendingBufferAccessor<?> accessor =
+            state.createFlinkStateAppendingBufferAccessor(functionType, persistedAppendingBuffer);
+        setAccessorRaw(persistedAppendingBuffer, accessor);
+        builder.withPersistedList(persistedAppendingBuffer);
       } else {
         throw new IllegalArgumentException("Unknown persisted field " + persisted);
       }
@@ -69,4 +78,11 @@ public final class StateBinder {
   private static void setAccessorRaw(PersistedValue<?> persistedValue, Accessor<?> accessor) {
     ApiExtension.setPersistedValueAccessor((PersistedValue) persistedValue, accessor);
   }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static void setAccessorRaw(
+      PersistedAppendingBuffer<?> persistedAppendingBuffer, AppendingBufferAccessor<?> accessor) {
+    ApiExtension.setPersistedAppendingBufferAccessor(
+        (PersistedAppendingBuffer) persistedAppendingBuffer, accessor);
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
index d8cefd4..585590e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
@@ -27,4 +27,9 @@ public class ApiExtension {
       PersistedTable<K, V> persistedTable, TableAccessor<K, V> accessor) {
     persistedTable.setAccessor(accessor);
   }
+
+  public static <E> void setPersistedAppendingBufferAccessor(
+      PersistedAppendingBuffer<E> persistedAppendingBuffer, AppendingBufferAccessor<E> accessor) {
+    persistedAppendingBuffer.setAccessor(accessor);
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
index da428f0..0f54f04 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
@@ -87,6 +87,13 @@ public class StateBinderTest {
   }
 
   @Test
+  public void bindPersistedAppendingBuffer() {
+    binderUnderTest.bind(TestUtils.FUNCTION_TYPE, new PersistedAppendingBufferState());
+
+    assertThat(state.boundNames, hasItems("buffer"));
+  }
+
+  @Test
   public void bindComposedState() {
     binderUnderTest.bind(TestUtils.FUNCTION_TYPE, new OuterClass());
 
@@ -141,6 +148,12 @@ public class StateBinderTest {
     PersistedTable<String, byte[]> value = PersistedTable.of("table", String.class, byte[].class);
   }
 
+  static final class PersistedAppendingBufferState {
+    @Persisted
+    @SuppressWarnings("unused")
+    PersistedAppendingBuffer<Boolean> value = PersistedAppendingBuffer.of("buffer", Boolean.class);
+  }
+
   static final class InnerClass {
     @Persisted
     @SuppressWarnings("unused")