You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/07 09:55:56 UTC

[GitHub] TisonKun commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

TisonKun commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r254611464
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 ##########
 @@ -317,6 +318,96 @@ public void testSchedulingAllAtOnce() throws Exception {
 		}
 	}
 
+
+
+	/**
+	 * This job runs in N slots with N senders and N receivers.
+	 * Unless slot sharing is used, it cannot complete.
+	 */
+	@Test
+	public void testSlotSharingForForwardJob() throws Exception {
+		final int parallelism = 11;
+
+		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(parallelism)
+			.setConfiguration(getDefaultConfiguration())
+			.build();
+
+		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+			miniCluster.start();
+
+			final JobVertex sender = new JobVertex("Sender");
+			sender.setInvokableClass(CountDownLatchedSender.class);
+			sender.setParallelism(parallelism);
+
+			final JobVertex receiver = new JobVertex("Receiver");
+			receiver.setInvokableClass(CountDownLatchedReceiver.class);
+			receiver.setParallelism(parallelism);
+
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+				ResultPartitionType.PIPELINED);
+
+			final CountDownLatch countDownLatch = new CountDownLatch(parallelism);
+			CountDownLatchedSender.setLatch(countDownLatch);
+			CountDownLatchedReceiver.setLatch(countDownLatch);
+
+			final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			receiver.setStrictlyCoLocatedWith(sender);
 
 Review comment:
   Previously, we only have slot-sharing with colocation constraints in `CoLocationConstraintITCase.scala`. For test coverage, should I add a test case that nearly same as this one except without the statement `receiver.setStrictlyCoLocatedWith(sender)`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services