You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/10 06:07:23 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #13098: [FLINK-18866][python] Support filter() operation for Python DataStrea…

hequn8128 commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r467701714



##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -149,6 +149,33 @@ def flat_map(value):
         expected.sort()
         self.assertEqual(expected, results)
 
+    def test_filter_without_data_types(self):
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
+        filtered_stream = ds.filter(MyFilterFunction())
+        collect_util = DataStreamCollectUtil()
+        collect_util.collect(filtered_stream)
+        self.env.execute("test filter")
+        results = collect_util.results()
+        expected = ["(2, 'Hello', 'Hi')"]
+        results.sort()
+        expected.sort()
+        self.assertEqual(expected, results)
+
+    def test_filter_with_data_types(self):
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
+                                      type_info=Types.ROW(
+                                          [Types.INT(), Types.STRING(), Types.STRING()])
+                                      )
+        filtered_stream = ds.filter(MyFilterFunction())
+        collect_util = DataStreamCollectUtil()
+        collect_util.collect(filtered_stream)
+        self.env.execute("test filter")
+        results = collect_util.results()
+        expected = ['2,Hello,Hi']
+        results.sort()
+        expected.sort()
+        self.assertEqual(expected, results)

Review comment:
       Change either one to Callable filter function to cover the Callable scenario. 

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the

Review comment:
       elemetns => elements

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the
+    predicate is applied. Violating this assumption can lead to incoorect results.
+    """
+
+    @abc.abstractmethod
+    def filter(self, value):
+        """
+        The filter function that evaluates the predicate.
+
+        :param value: The value to be filtered.
+        :return: Tre for values that should be retained, false for values to be filtered out.

Review comment:
       Tre => True

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;

Review comment:
       Remove `;`

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())
+    Note that the system assumes that the function does not modify the elemetns on which the
+    predicate is applied. Violating this assumption can lead to incoorect results.

Review comment:
       incoorect => incorrect

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +233,32 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        """
+        Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction
+        for each element of the DataStream and retains only those element for which the function
+        returns true. Elements for which the function returns false are filtered. The user can also
+        extend RichFilterFunction to gain access to other features provided by the RichFunction
+        interface.
+
+        :param func: The FilterFunction that is called for each element of the DataStream.
+        :return: The filtered DataStream.
+        """
+        class FilterFlatMap(FlatMapFunction):
+            def __init__(self, filter_func):
+                self._func = filter_func
+
+            def flat_map(self, value):
+                if self._func.filter(value):
+                    yield value
+
+        j_input_type = self._j_data_stream.getTransformation().getOutputType()
+        type_info = typeinfo._from_java_type(j_input_type)
+        j_data_stream = self.flat_map(FilterFlatMap(func), type_info=type_info)._j_data_stream

Review comment:
       What if the `func` is a `Callable`?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -83,6 +83,29 @@ def flat_map(self, value):
         pass
 
 
+class FilterFunction(Function):
+    """
+    A filter function is a predicate applied individually to each record. The predicate decides
+    whether to keep the element, or to discard it.
+    The basic syntax for using a FilterFunction is as follows:
+    :
+         >>> ds = ...;
+         >>> result = ds.filter(new MyFilterFunction())

Review comment:
       Remove `new`

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -140,6 +163,19 @@ def flat_map(self, value):
         return self._func(value)
 
 
+class FilterFunctionWrapper(FunctionWrapper):

Review comment:
       This class has never been used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org