You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/22 11:46:33 UTC

[GitHub] [beam] dmvk commented on a change in pull request #15984: [BEAM-2791] OrderedListState for Flink

dmvk commented on a change in pull request #15984:
URL: https://github.com/apache/beam/pull/15984#discussion_r773824038



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -419,7 +429,130 @@ public int hashCode() {
     }
   }
 
-  private static class FlinkBagState<K, T> implements BagState<T> {
+  private static class FlinkOrderedListState<T> implements OrderedListState<T> {
+    private final StateNamespace namespace;
+    // TODO: Consider instead storing MapStateDescriptor<Instant, T> - at least for some state
+    // backends this will
+    // be stored sorted, which could save time sorting.

Review comment:
       Do we want to keep this todo? TBH I'm still not big fan of this approach, as it relies on side effects that can change at any time without notice.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -419,7 +429,130 @@ public int hashCode() {
     }
   }
 
-  private static class FlinkBagState<K, T> implements BagState<T> {
+  private static class FlinkOrderedListState<T> implements OrderedListState<T> {
+    private final StateNamespace namespace;
+    // TODO: Consider instead storing MapStateDescriptor<Instant, T> - at least for some state
+    // backends this will
+    // be stored sorted, which could save time sorting.
+    private final ListStateDescriptor<TimestampedValue<T>> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+    FlinkOrderedListState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        String stateId,
+        StateNamespace namespace,
+        Coder<T> coder,
+        SerializablePipelineOptions pipelineOptions) {
+      this.namespace = namespace;
+      this.flinkStateBackend = flinkStateBackend;
+      this.flinkStateDescriptor =
+          new ListStateDescriptor<>(
+              stateId, new CoderTypeSerializer<>(TimestampedValueCoder.of(coder), pipelineOptions));
+    }
+
+    @Override
+    public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant limitTimestamp) {
+      return readAsMap().subMap(minTimestamp, limitTimestamp).values();

Review comment:
       OT: I've always missed a more sophisticated lifecycle for these states. Something that would allow us to cache the map and invalidate it when it's no longer needed (eg. when we're processing the next key).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org