You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/27 10:03:44 UTC

[GitHub] [flink] WeiZhong94 commented on a change in pull request #14758: [FLINK-21112][python] Add ValueState/ListState/MapState and corresponding StateDescriptors for Python DataStream API

WeiZhong94 commented on a change in pull request #14758:
URL: https://github.com/apache/flink/pull/14758#discussion_r565170564



##########
File path: flink-python/pyflink/common/state.py
##########
@@ -94,6 +98,29 @@ def add(self, value: T) -> None:
         """
         pass
 
+
+class MergingState(AppendingState[IN, OUT], ABC):

Review comment:
       ```suggestion
   class MergingState(AppendingState[IN, OUT]):
   ```

##########
File path: flink-python/pyflink/common/state.py
##########
@@ -67,17 +71,17 @@ def update(self, value: T) -> None:
         pass
 
 
-class ListState(State, Generic[T]):
+class AppendingState(State, Generic[IN, OUT]):
     """
-    :class:`State` interface for partitioned list state in Operations.
-    The state is accessed and modified by user functions, and checkpointed consistently
-    by the system as part of the distributed snapshots.
+    Base interface for partitioned state taht supports adding elements and inspecting the current
+    state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
 
-    Currently only keyed list state is supported.
+    This state is accessed and modified by user functions, and checkpointed consistently by the
+    system as part of the distributed snapshots.
 
-    When it is a keyed list state, the state key is automatically supplied by the system, so the
-    user function always sees the value mapped to the key of the current element. That way, the
-    system can handle stream and state partitioning consistently together.
+    The state is only accessible by functions applied on a KeyedStream. The key is automatically
+    supplied by the system, so the function always sees the value mapped to the key of the current
+    element. That way, the system can handle stream and state partitioning consistently together.
     """
 
     @abstractmethod

Review comment:
       def get(self) -> OUT:

##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -500,6 +503,43 @@ def generate_func(self, serialized_fn) -> Tuple:
             self.keyed_state_backend)
         return func, [proc_func]
 
+    def open(self):
+        for user_defined_func in self.user_defined_funcs:
+            if hasattr(user_defined_func, 'open'):
+                runtime_context = KeyedProcessFunctionOperation.InternalRuntimeContext(
+                    self.spec.serialized_fn.runtime_context.task_name,
+                    self.spec.serialized_fn.runtime_context.task_name_with_subtasks,
+                    self.spec.serialized_fn.runtime_context.number_of_parallel_subtasks,
+                    self.spec.serialized_fn.runtime_context.max_number_of_parallel_subtasks,
+                    self.spec.serialized_fn.runtime_context.index_of_this_subtask,
+                    self.spec.serialized_fn.runtime_context.attempt_number,
+                    {p.key: p.value for p in
+                     self.spec.serialized_fn.runtime_context.job_parameters},
+                    self.keyed_state_backend)
+                user_defined_func.open(runtime_context)
+
+    class InternalRuntimeContext(RuntimeContext):

Review comment:
       The InternalRuntimeContext should be extracted to outer scope as other operations will use it in the future.

##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -500,6 +503,43 @@ def generate_func(self, serialized_fn) -> Tuple:
             self.keyed_state_backend)
         return func, [proc_func]
 
+    def open(self):
+        for user_defined_func in self.user_defined_funcs:
+            if hasattr(user_defined_func, 'open'):
+                runtime_context = KeyedProcessFunctionOperation.InternalRuntimeContext(
+                    self.spec.serialized_fn.runtime_context.task_name,
+                    self.spec.serialized_fn.runtime_context.task_name_with_subtasks,
+                    self.spec.serialized_fn.runtime_context.number_of_parallel_subtasks,
+                    self.spec.serialized_fn.runtime_context.max_number_of_parallel_subtasks,
+                    self.spec.serialized_fn.runtime_context.index_of_this_subtask,
+                    self.spec.serialized_fn.runtime_context.attempt_number,
+                    {p.key: p.value for p in
+                     self.spec.serialized_fn.runtime_context.job_parameters},
+                    self.keyed_state_backend)
+                user_defined_func.open(runtime_context)
+
+    class InternalRuntimeContext(RuntimeContext):
+
+        def __init__(self, task_name: str, task_name_with_subtasks: str,
+                     number_of_parallel_subtasks: int, max_number_of_parallel_subtasks: int,
+                     index_of_this_subtask: int, attempt_number: int,
+                     job_parameters: Dict[str, str], keyed_state_backend: RemoteKeyedStateBackend):
+            super(KeyedProcessFunctionOperation.InternalRuntimeContext, self).__init__(
+                task_name, task_name_with_subtasks, number_of_parallel_subtasks,
+                max_number_of_parallel_subtasks, index_of_this_subtask, attempt_number,
+                job_parameters)
+            self.keyed_state_backend = keyed_state_backend

Review comment:
       The private member should have a "_" prefix. Users can touch the instance of this class in their code so we need to eliminate unnecessary confusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org