You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/19 12:35:51 UTC
[flink-statefun] 03/04: [FLINK-16106] [core] Let StateBinder handle
PersistedAppendingBuffer
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 20e8b5cd34484b35ec52cc441a63073c1dce399e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 17 14:18:24 2020 +0800
[FLINK-16106] [core] Let StateBinder handle PersistedAppendingBuffer
This closes #25.
---
.../flink/statefun/flink/core/state/BoundState.java | 19 +++++++++++++++++--
.../statefun/flink/core/state/PersistedStates.java | 5 ++++-
.../flink/statefun/flink/core/state/StateBinder.java | 16 ++++++++++++++++
.../apache/flink/statefun/sdk/state/ApiExtension.java | 5 +++++
.../statefun/flink/core/state/StateBinderTest.java | 13 +++++++++++++
5 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
index f7b131a..d86cab1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
@@ -20,6 +20,7 @@ package org.apache.flink.statefun.flink.core.state;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -31,11 +32,15 @@ public class BoundState {
private final List<PersistedValue<?>> persistedValues;
private final List<PersistedTable<?, ?>> persistedTables;
+ private final List<PersistedAppendingBuffer<?>> persistedAppendingBuffers;
private BoundState(
- List<PersistedValue<?>> persistedValues, List<PersistedTable<?, ?>> persistedTables) {
+ List<PersistedValue<?>> persistedValues,
+ List<PersistedTable<?, ?>> persistedTables,
+ List<PersistedAppendingBuffer<?>> persistedAppendingBuffers) {
this.persistedValues = Objects.requireNonNull(persistedValues);
this.persistedTables = Objects.requireNonNull(persistedTables);
+ this.persistedAppendingBuffers = Objects.requireNonNull(persistedAppendingBuffers);
}
@SuppressWarnings("unused")
@@ -48,10 +53,15 @@ public class BoundState {
return persistedTables;
}
+ public List<PersistedAppendingBuffer<?>> getPersistedAppendingBuffers() {
+ return persistedAppendingBuffers;
+ }
+
@SuppressWarnings("UnusedReturnValue")
public static final class Builder {
private List<PersistedValue<?>> persistedValues = new ArrayList<>();
private List<PersistedTable<?, ?>> persistedTables = new ArrayList<>();
+ private List<PersistedAppendingBuffer<?>> persistedAppendingBuffers = new ArrayList<>();
private Builder() {}
@@ -65,8 +75,13 @@ public class BoundState {
return this;
}
+ public Builder withPersistedList(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
+ this.persistedAppendingBuffers.add(persistedAppendingBuffer);
+ return this;
+ }
+
public BoundState build() {
- return new BoundState(persistedValues, persistedTables);
+ return new BoundState(persistedValues, persistedTables, persistedAppendingBuffers);
}
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
index f787986..c890e56 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/PersistedStates.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -76,7 +77,9 @@ final class PersistedStates {
}
private static boolean isPersistedState(Class<?> fieldType) {
- return fieldType == PersistedValue.class || fieldType == PersistedTable.class;
+ return fieldType == PersistedValue.class
+ || fieldType == PersistedTable.class
+ || fieldType == PersistedAppendingBuffer.class;
}
private static Object getPersistedValueReflectively(Object instance, Field persistedField) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
index a7b0c4c..4415068 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/StateBinder.java
@@ -26,6 +26,8 @@ import org.apache.flink.statefun.flink.core.state.BoundState.Builder;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.state.Accessor;
import org.apache.flink.statefun.sdk.state.ApiExtension;
+import org.apache.flink.statefun.sdk.state.AppendingBufferAccessor;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.statefun.sdk.state.TableAccessor;
@@ -53,6 +55,13 @@ public final class StateBinder {
state.createFlinkStateTableAccessor(functionType, persistedTable);
setAccessorRaw(persistedTable, accessor);
builder.withPersistedTable(persistedTable);
+ } else if (persisted instanceof PersistedAppendingBuffer) {
+ PersistedAppendingBuffer<?> persistedAppendingBuffer =
+ (PersistedAppendingBuffer<?>) persisted;
+ AppendingBufferAccessor<?> accessor =
+ state.createFlinkStateAppendingBufferAccessor(functionType, persistedAppendingBuffer);
+ setAccessorRaw(persistedAppendingBuffer, accessor);
+ builder.withPersistedList(persistedAppendingBuffer);
} else {
throw new IllegalArgumentException("Unknown persisted field " + persisted);
}
@@ -69,4 +78,11 @@ public final class StateBinder {
private static void setAccessorRaw(PersistedValue<?> persistedValue, Accessor<?> accessor) {
ApiExtension.setPersistedValueAccessor((PersistedValue) persistedValue, accessor);
}
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static void setAccessorRaw(
+ PersistedAppendingBuffer<?> persistedAppendingBuffer, AppendingBufferAccessor<?> accessor) {
+ ApiExtension.setPersistedAppendingBufferAccessor(
+ (PersistedAppendingBuffer) persistedAppendingBuffer, accessor);
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
index d8cefd4..585590e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java
@@ -27,4 +27,9 @@ public class ApiExtension {
PersistedTable<K, V> persistedTable, TableAccessor<K, V> accessor) {
persistedTable.setAccessor(accessor);
}
+
+ public static <E> void setPersistedAppendingBufferAccessor(
+ PersistedAppendingBuffer<E> persistedAppendingBuffer, AppendingBufferAccessor<E> accessor) {
+ persistedAppendingBuffer.setAccessor(accessor);
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
index da428f0..0f54f04 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/StateBinderTest.java
@@ -87,6 +87,13 @@ public class StateBinderTest {
}
@Test
+ public void bindPersistedAppendingBuffer() {
+ binderUnderTest.bind(TestUtils.FUNCTION_TYPE, new PersistedAppendingBufferState());
+
+ assertThat(state.boundNames, hasItems("buffer"));
+ }
+
+ @Test
public void bindComposedState() {
binderUnderTest.bind(TestUtils.FUNCTION_TYPE, new OuterClass());
@@ -141,6 +148,12 @@ public class StateBinderTest {
PersistedTable<String, byte[]> value = PersistedTable.of("table", String.class, byte[].class);
}
+ static final class PersistedAppendingBufferState {
+ @Persisted
+ @SuppressWarnings("unused")
+ PersistedAppendingBuffer<Boolean> value = PersistedAppendingBuffer.of("buffer", Boolean.class);
+ }
+
static final class InnerClass {
@Persisted
@SuppressWarnings("unused")