You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/14 02:38:10 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #13140: [FLINK-18884][python] Add chaining strategy and slot sharing group in…

hequn8128 commented on a change in pull request #13140:
URL: https://github.com/apache/flink/pull/13140#discussion_r470377742



##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -322,6 +323,86 @@ def test_keyed_stream_partitioning(self):
         with self.assertRaises(Exception):
             keyed_stream.forward()
 
+    def test_slot_sharing_group(self):
+        source_operator_name = 'collection source'
+        map_operator_name = 'map_operator'
+        slot_sharing_group_1 = 'slot_sharing_group_1'
+        slot_sharing_group_2 = 'slot_sharing_group_2'
+        ds_1 = self.env.from_collection([1, 2, 3]).name(source_operator_name)
+        ds_1.slot_sharing_group(slot_sharing_group_1).map(lambda x: x + 1).set_parallelism(3)\
+            .name(map_operator_name).slot_sharing_group(slot_sharing_group_2)\
+            .add_sink(self.test_sink)
+
+        j_generated_stream_graph = self.env._j_stream_execution_environment \
+            .getStreamGraph("test start new_chain", True)
+
+        j_stream_nodes = list(j_generated_stream_graph.getStreamNodes().toArray())
+        for j_stream_node in j_stream_nodes:
+            if j_stream_node.getOperatorName() == source_operator_name:
+                self.assertEqual(j_stream_node.getSlotSharingGroup(), slot_sharing_group_1)
+            elif j_stream_node.getOperatorName() == map_operator_name:
+                self.assertEqual(j_stream_node.getSlotSharingGroup(), slot_sharing_group_2)
+
+    def test_chaining_strategy(self):
+        chained_operator_name = "map_operator_1"
+        chained_operator_name_1 = "map_operator_2"
+
+        ds = self.env.from_collection([1, 2, 3])
+        ds.set_parallelism(2).map(lambda x: x).set_parallelism(2)\
+            .name(chained_operator_name).map(lambda x: x).set_parallelism(2)\
+            .name(chained_operator_name_1).add_sink(self.test_sink)
+
+        def assert_chainable(j_stream_graph, expected_upstream_chainable,
+                             expected_downstream_chainable):
+            j_stream_nodes = list(j_stream_graph.getStreamNodes().toArray())
+            for j_stream_node in j_stream_nodes:
+                if j_stream_node.getOperatorName() == chained_operator_name:
+                    JStreamingJobGraphGenerator = get_gateway().jvm \
+                        .org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
+
+                    j_in_stream_edge = j_stream_node.getInEdges().get(0)
+                    upstream_chainable = JStreamingJobGraphGenerator.isChainable(j_in_stream_edge,
+                                                                                 j_stream_graph)
+                    self.assertEqual(expected_upstream_chainable, upstream_chainable)
+
+                    j_out_stream_edge = j_stream_node.getOutEdges().get(0)
+                    downstream_chainable = JStreamingJobGraphGenerator.isChainable(
+                        j_out_stream_edge, j_stream_graph)
+                    self.assertEqual(expected_downstream_chainable, downstream_chainable)
+
+        # The map_operator_1 has the same parallelism with source operator and map_operator_2, and
+        # ship_strategy for collection source and map_operator_1 is FORWARD, so the map_operator_1
+        # can be chained with collection source and map_operator_2.
+        j_generated_stream_graph = self.env._j_stream_execution_environment\
+            .getStreamGraph("test start new_chain", True)
+        assert_chainable(j_generated_stream_graph, True, True)
+
+        ds = self.env.from_collection([1, 2, 3])
+        # Start a new chain for map_operator_1
+        ds.set_parallelism(2).map(lambda x: x).set_parallelism(2) \

Review comment:
       set parallelism on this source ds is useless since the parallelism of from_collection would always be one. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org