You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/19 11:55:50 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #19743: [FLINK-27657][python] Implement remote operator state backend in PyFlink

dianfu commented on code in PR #19743:
URL: https://github.com/apache/flink/pull/19743#discussion_r876949022


##########
flink-python/pyflink/datastream/state.py:
##########
@@ -281,6 +283,110 @@ def __iter__(self) -> Iterator[K]:
         return iter(self.keys())
 
 
+class ReadOnlyBroadcastState(State, Generic[K, V]):
+    """
+    A read-only view of the :class:`BroadcastState`.
+    Although read-only, the user code should not modify the value returned by the :meth:`get` or the
+    items returned by :meth:`items`, as this can lead to inconsistent states. The reason for this is
+    that we do not create extra copies of the elements for performance reasons.
+    """
+
+    @abstractmethod
+    def get(self, key: K) -> V:
+        """
+        Returns the current value associated with the given key.
+        """
+        pass
+
+    @abstractmethod
+    def contains(self, key: K) -> bool:
+        """
+        Returns whether there exists the given mapping.
+        """
+        pass
+
+    @abstractmethod
+    def items(self) -> Iterable[Tuple[K, V]]:
+        """
+        Returns all the mappings in the state.
+        """
+        pass
+
+    @abstractmethod
+    def keys(self) -> Iterable[K]:

Review Comment:
   The methods in this class are not the same as the methods in Java `ReadOnlyBroadcastState`. Is this intended? If so, could you explain it a bit? This class is a public class and so we need to be careful of it.



##########
flink-python/pyflink/datastream/state.py:
##########
@@ -281,6 +283,110 @@ def __iter__(self) -> Iterator[K]:
         return iter(self.keys())
 
 
+class ReadOnlyBroadcastState(State, Generic[K, V]):
+    """
+    A read-only view of the :class:`BroadcastState`.
+    Although read-only, the user code should not modify the value returned by the :meth:`get` or the
+    items returned by :meth:`items`, as this can lead to inconsistent states. The reason for this is
+    that we do not create extra copies of the elements for performance reasons.
+    """
+
+    @abstractmethod
+    def get(self, key: K) -> V:
+        """
+        Returns the current value associated with the given key.
+        """
+        pass
+
+    @abstractmethod
+    def contains(self, key: K) -> bool:
+        """
+        Returns whether there exists the given mapping.
+        """
+        pass
+
+    @abstractmethod
+    def items(self) -> Iterable[Tuple[K, V]]:
+        """
+        Returns all the mappings in the state.
+        """
+        pass
+
+    @abstractmethod
+    def keys(self) -> Iterable[K]:
+        """
+        Returns all the keys in the state.
+        """
+        pass
+
+    @abstractmethod
+    def values(self) -> Iterable[V]:
+        """
+        Returns all the values in the state.
+        """
+        pass
+
+    @abstractmethod
+    def is_empty(self) -> bool:
+        """
+        Returns true if this state contains no key-value mappings, otherwise false.
+        """
+        pass
+
+    def __getitem__(self, key: K) -> V:
+        return self.get(key)
+
+    def __contains__(self, key: K) -> bool:
+        return self.contains(key)
+
+    def __iter__(self) -> Iterator[K]:
+        return iter(self.keys())
+
+
+class BroadcastState(ReadOnlyBroadcastState[K, V]):

Review Comment:
   Ditto. Why the APIs in this class aren't the same as the Java one?



##########
flink-python/pyflink/datastream/__init__.py:
##########
@@ -141,6 +141,11 @@
     - :class:`state.AggregatingState`:
       Interface for aggregating state, based on an :class:`AggregateFunction`. Elements that are
       added to this type of state will be eagerly pre-aggregated using a given AggregateFunction.
+    - :class:`state.BroadcastState`:
+      A type of state that can be created to store the state of a :class:`BroadcastStream`. This
+      state assumes that the same elements are sent to all instances of an operator.
+    - :class:`state.ReadOnlyBroadcastState`:

Review Comment:
   I guess users don't need to import this class manually and so there is no need to expose it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org