You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/14 04:09:02 UTC

[GitHub] [flink] shuiqiangchen commented on a change in pull request #13140: [FLINK-18884][python] Add chaining strategy and slot sharing group in…

shuiqiangchen commented on a change in pull request #13140:
URL: https://github.com/apache/flink/pull/13140#discussion_r470399135



##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -322,6 +323,86 @@ def test_keyed_stream_partitioning(self):
         with self.assertRaises(Exception):
             keyed_stream.forward()
 
+    def test_slot_sharing_group(self):
+        source_operator_name = 'collection source'
+        map_operator_name = 'map_operator'
+        slot_sharing_group_1 = 'slot_sharing_group_1'
+        slot_sharing_group_2 = 'slot_sharing_group_2'
+        ds_1 = self.env.from_collection([1, 2, 3]).name(source_operator_name)
+        ds_1.slot_sharing_group(slot_sharing_group_1).map(lambda x: x + 1).set_parallelism(3)\
+            .name(map_operator_name).slot_sharing_group(slot_sharing_group_2)\
+            .add_sink(self.test_sink)
+
+        j_generated_stream_graph = self.env._j_stream_execution_environment \
+            .getStreamGraph("test start new_chain", True)
+
+        j_stream_nodes = list(j_generated_stream_graph.getStreamNodes().toArray())
+        for j_stream_node in j_stream_nodes:
+            if j_stream_node.getOperatorName() == source_operator_name:
+                self.assertEqual(j_stream_node.getSlotSharingGroup(), slot_sharing_group_1)
+            elif j_stream_node.getOperatorName() == map_operator_name:
+                self.assertEqual(j_stream_node.getSlotSharingGroup(), slot_sharing_group_2)
+
+    def test_chaining_strategy(self):
+        chained_operator_name = "map_operator_1"
+        chained_operator_name_1 = "map_operator_2"
+
+        ds = self.env.from_collection([1, 2, 3])
+        ds.set_parallelism(2).map(lambda x: x).set_parallelism(2)\
+            .name(chained_operator_name).map(lambda x: x).set_parallelism(2)\
+            .name(chained_operator_name_1).add_sink(self.test_sink)
+
+        def assert_chainable(j_stream_graph, expected_upstream_chainable,
+                             expected_downstream_chainable):
+            j_stream_nodes = list(j_stream_graph.getStreamNodes().toArray())
+            for j_stream_node in j_stream_nodes:
+                if j_stream_node.getOperatorName() == chained_operator_name:
+                    JStreamingJobGraphGenerator = get_gateway().jvm \
+                        .org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
+
+                    j_in_stream_edge = j_stream_node.getInEdges().get(0)
+                    upstream_chainable = JStreamingJobGraphGenerator.isChainable(j_in_stream_edge,
+                                                                                 j_stream_graph)
+                    self.assertEqual(expected_upstream_chainable, upstream_chainable)
+
+                    j_out_stream_edge = j_stream_node.getOutEdges().get(0)
+                    downstream_chainable = JStreamingJobGraphGenerator.isChainable(
+                        j_out_stream_edge, j_stream_graph)
+                    self.assertEqual(expected_downstream_chainable, downstream_chainable)
+
+        # The map_operator_1 has the same parallelism with source operator and map_operator_2, and
+        # ship_strategy for collection source and map_operator_1 is FORWARD, so the map_operator_1
+        # can be chained with collection source and map_operator_2.
+        j_generated_stream_graph = self.env._j_stream_execution_environment\
+            .getStreamGraph("test start new_chain", True)
+        assert_chainable(j_generated_stream_graph, True, True)
+
+        ds = self.env.from_collection([1, 2, 3])
+        # Start a new chain for map_operator_1
+        ds.set_parallelism(2).map(lambda x: x).set_parallelism(2) \

Review comment:
       Yes, there should be followed by a new operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org