You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/19 12:35:49 UTC
[flink-statefun] 01/04: [FLINK-16106] [sdk] Add
PersistedAppendingBuffer to SDK
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 9efe9ec0529cab3f67d475d1f5764088fe3e76ac
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 17 13:29:46 2020 +0800
[FLINK-16106] [sdk] Add PersistedAppendingBuffer to SDK
---
.../sdk/state/AppendingBufferAccessor.java | 35 ++++
.../sdk/state/PersistedAppendingBuffer.java | 225 +++++++++++++++++++++
2 files changed, 260 insertions(+)
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/AppendingBufferAccessor.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/AppendingBufferAccessor.java
new file mode 100644
index 0000000..e14eb13
--- /dev/null
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/AppendingBufferAccessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.state;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+public interface AppendingBufferAccessor<E> {
+
+ void append(E element);
+
+ void appendAll(List<E> elements);
+
+ void replaceWith(List<E> elements);
+
+ @Nullable
+ Iterable<E> view();
+
+ void clear();
+}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
new file mode 100644
index 0000000..98374d1
--- /dev/null
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.state;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+
+/**
+ * A {@link PersistedAppendingBuffer} is an append-only buffer registered within {@link
+ * StatefulFunction}s and is persisted and maintained by the system for fault-tolerance. Persisted
+ * elements in the buffer may only be updated with bulk replacements.
+ *
+ * <p>Created persisted buffers must be registered by using the {@link Persisted} annotation. Please
+ * see the class-level Javadoc of {@link StatefulFunction} for an example on how to do that.
+ *
+ * @see StatefulFunction
+ * @param <E> type of the list elements.
+ */
+public final class PersistedAppendingBuffer<E> {
+ private final String name;
+ private final Class<E> elementType;
+ private AppendingBufferAccessor<E> accessor;
+
+ private PersistedAppendingBuffer(
+ String name, Class<E> elementType, AppendingBufferAccessor<E> accessor) {
+ this.name = Objects.requireNonNull(name);
+ this.elementType = Objects.requireNonNull(elementType);
+ this.accessor = Objects.requireNonNull(accessor);
+ }
+
+ /**
+ * Creates a {@link PersistedAppendingBuffer} instance that may be used to access persisted state
+ * managed by the system. Access to the persisted buffer is identified by an unique name and type
+ * of the elements. These may not change across multiple executions of the application.
+ *
+ * @param name the unique name of the persisted buffer state
+ * @param elementType the type of the elements of this {@code PersistedAppendingBuffer}.
+ * @param <E> the type of the elements.
+ * @return a {@code PersistedAppendingBuffer} instance.
+ */
+ public static <E> PersistedAppendingBuffer<E> of(String name, Class<E> elementType) {
+ return new PersistedAppendingBuffer<>(name, elementType, new NonFaultTolerantAccessor<>());
+ }
+
+ /**
+ * Returns the unique name of the persisted buffer.
+ *
+ * @return unique name of the persisted buffer.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the type of the persisted buffer elements.
+ *
+ * @return the type of the persisted buffer elements.
+ */
+ public Class<E> elementType() {
+ return elementType;
+ }
+
+ /**
+ * Appends an element to the persisted buffer.
+ *
+ * <p>If {@code null} is passed in, then this method has no effect and the persisted buffer
+ * remains the same.
+ *
+ * @param element the element to add to the persisted buffer.
+ */
+ public void append(@Nullable E element) {
+ if (element != null) {
+ accessor.append(element);
+ }
+ }
+
+ /**
+ * Adds all elements of a list to the persisted buffer.
+ *
+ * <p>If {@code null} or an empty list is passed in, then this method has no effect and the
+ * persisted buffer remains the same.
+ *
+ * @param elements a list of elements to add to the persisted buffer.
+ */
+ public void appendAll(@Nullable List<E> elements) {
+ if (elements != null && !elements.isEmpty()) {
+ accessor.appendAll(elements);
+ }
+ }
+
+ /**
+ * Replace the elements in the persisted buffer with the provided list of elements.
+ *
+ * <p>If an empty list or {@code null} is passed in, this method will have the same effect as
+ * {@link #clear()}.
+ *
+ * @param elements list of elements to replace the elements in the persisted buffer with.
+ */
+ public void replaceWith(@Nullable List<E> elements) {
+ if (elements != null && !elements.isEmpty()) {
+ accessor.replaceWith(elements);
+ } else {
+ accessor.clear();
+ }
+ }
+
+ /**
+ * Gets an unmodifiable view of the elements of the persisted buffer, as an {@link Iterable}.
+ *
+ * <p>This may return {@code null} if the buffer is empty or had been cleared (with {@link
+ * #clear()}).
+ *
+ * @return an unmodifiable view, as an {@link Iterable}, of the elements of the persisted buffer,
+ * or {@code null} if the buffer is empty or had been cleared.
+ */
+ @Nullable
+ public Iterable<E> view() {
+ final Iterable<E> view = accessor.view();
+ return (view != null) ? new UnmodifiableViewIterable<>(view) : null;
+ }
+
+ /** Clears all elements in the persisted buffer. */
+ public void clear() {
+ accessor.clear();
+ }
+
+ @ForRuntime
+ void setAccessor(AppendingBufferAccessor<E> newAccessor) {
+ this.accessor = Objects.requireNonNull(newAccessor);
+ }
+
+ private static final class NonFaultTolerantAccessor<E> implements AppendingBufferAccessor<E> {
+ private List<E> list;
+
+ @Override
+ public void append(E element) {
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ list.add(element);
+ }
+
+ @Override
+ public void appendAll(List<E> elements) {
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ list.addAll(elements);
+ }
+
+ @Override
+ public void replaceWith(List<E> elements) {
+ list = elements;
+ }
+
+ @Nullable
+ @Override
+ public Iterable<E> view() {
+ return list;
+ }
+
+ @Override
+ public void clear() {
+ list = null;
+ }
+ }
+
+ private static final class UnmodifiableViewIterable<E> implements Iterable<E> {
+
+ private final Iterable<E> delegate;
+
+ private UnmodifiableViewIterable(Iterable<E> delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return new UnmodifiableViewIterator<>(delegate.iterator());
+ }
+ }
+
+ private static final class UnmodifiableViewIterator<E> implements Iterator<E> {
+
+ private final Iterator<E> delegate;
+
+ UnmodifiableViewIterator(Iterator<E> delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return delegate.next();
+ }
+
+ public final void remove() {
+ throw new UnsupportedOperationException("This is an unmodifiable view.");
+ }
+ }
+}