You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/13 08:53:21 UTC

[GitHub] tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase

tillrohrmann commented on a change in pull request #6883: [FLINK-10610] [tests] Port slot sharing cases to new codebase
URL: https://github.com/apache/flink/pull/6883#discussion_r256295712
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 ##########
 @@ -337,6 +347,262 @@ public void testSchedulingAllAtOnce() throws Exception {
 		}
 	}
 
+	@Test
+	public void testSlotSharingForForwardJobWithCoLocationConstraint() throws Exception {
+		testSlotSharingForForwardJob(true);
+	}
+
+	@Test
+	public void testSlotSharingForForwardJobWithoutCoLocationConstraint() throws Exception {
+		testSlotSharingForForwardJob(false);
+	}
+
+	/**
+	 * This job runs in N slots with N senders and N receivers.
+	 * Unless slot sharing is used, it cannot complete.
+	 * Either with or without co-location constraint should not
+	 * make difference.
+	 */
+	private void testSlotSharingForForwardJob(boolean withCoLocationConstraint) throws Exception {
+		final int parallelism = 11;
+
+		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(parallelism)
+			.setConfiguration(getDefaultConfiguration())
+			.build();
+
+		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+			miniCluster.start();
+
+			final JobVertex sender = new JobVertex("Sender");
+			sender.setInvokableClass(CountDownLatchedSender.class);
+			sender.setParallelism(parallelism);
+
+			final JobVertex receiver = new JobVertex("Receiver");
+			receiver.setInvokableClass(CountDownLatchedReceiver.class);
+			receiver.setParallelism(parallelism);
+
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+				ResultPartitionType.PIPELINED);
+
+			final CountDownLatch countDownLatch = new CountDownLatch(parallelism);
+			CountDownLatchedSender.setLatch(countDownLatch);
+			CountDownLatchedReceiver.setLatch(countDownLatch);
+
+			final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+
+			if (withCoLocationConstraint) {
+				receiver.setStrictlyCoLocatedWith(sender);
+			}
+
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+			miniCluster.executeJobBlocking(jobGraph);
+		}
+	}
+
+	/**
+	 * A sender that does not exit until all receivers are running.
+	 */
+	public static class CountDownLatchedSender extends AbstractInvokable {
+
+		private static CountDownLatch latch;
+
+		static void setLatch(CountDownLatch latch) {
+			CountDownLatchedSender.latch = latch;
+		}
+
+		public CountDownLatchedSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));
+
+			try {
+				writer.emit(new IntValue(42));
+				writer.emit(new IntValue(1337));
+				writer.flushAll();
+			} finally {
+				writer.clearBuffers();
+				latch.await();
+			}
+		}
+	}
+
+	/**
+	 * A receiver that counts down the latch on running.
+	 */
+	public static class CountDownLatchedReceiver extends AbstractInvokable {
+
+		private static CountDownLatch latch;
+
+		static void setLatch(CountDownLatch latch) {
+			CountDownLatchedReceiver.latch = latch;
+		}
+
+		public CountDownLatchedReceiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			latch.countDown();
+
+			RecordReader<IntValue> reader = new RecordReader<>(
+				getEnvironment().getInputGate(0),
+				IntValue.class,
+				getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+			IntValue i1 = reader.next();
+			IntValue i2 = reader.next();
+			IntValue i3 = reader.next();
+
+			if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+				throw new Exception("Wrong data received.");
+			}
+		}
+	}
+
+	/**
+	 * This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used,
+	 * it cannot complete.
+	 */
+	@Test
+	public void testSlotSharingForTwoInputsJob() throws Exception {
+		final int parallelism = 11;
+
+		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(parallelism)
+			.setConfiguration(getDefaultConfiguration())
+			.build();
+
+		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
+			miniCluster.start();
+
+			final JobVertex sender1 = new JobVertex("Sender1");
+			sender1.setInvokableClass(CountDownLatchedSender.class);
+			sender1.setParallelism(parallelism);
+
+			final JobVertex sender2 = new JobVertex("Sender2");
+			sender2.setInvokableClass(CountDownLatchedSender.class);
+			sender2.setParallelism(parallelism);
+
+			final JobVertex receiver = new JobVertex("Receiver");
+			receiver.setInvokableClass(CountDownLatchedAgnosticBinaryReceiver.class);
+			receiver.setParallelism(parallelism);
+
+			receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+				ResultPartitionType.PIPELINED);
+			receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.PIPELINED);
+
+			final CountDownLatch countDownLatch = new CountDownLatch(parallelism);
+			CountDownLatchedSender.setLatch(countDownLatch);
+			CountDownLatchedAgnosticBinaryReceiver.setLatch(countDownLatch);
+
+			final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID());
+			sender1.setSlotSharingGroup(sharingGroup);
+			sender2.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+
+			final JobGraph jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver);
+
+			miniCluster.executeJobBlocking(jobGraph);
+		}
+	}
+
+	/**
+	 * A receiver that counts down the latch on running.
+	 */
+	public static class CountDownLatchedAgnosticBinaryReceiver extends AbstractInvokable {
+
+		private static CountDownLatch latch;
+
+		static void setLatch(CountDownLatch latch) {
+			CountDownLatchedAgnosticBinaryReceiver.latch = latch;
+		}
+
+		public CountDownLatchedAgnosticBinaryReceiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			latch.countDown();
+
+			RecordReader<IntValue> reader1 = new RecordReader<>(
+				getEnvironment().getInputGate(0),
+				IntValue.class,
+				getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+			RecordReader<IntValue> reader2 = new RecordReader<>(
+				getEnvironment().getInputGate(1),
+				IntValue.class,
+				getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+			while (reader1.next() != null) { }
+			while (reader2.next() != null) { }
+		}
+	}
+
+	@Test
+	public void testSlotSharingForForwardJobWithFailedTaskManager() throws Exception {
+		final int parallelism = 20;
+		final int numOfTaskManagers = 2;
+
+		final Configuration configuration = getDefaultConfiguration();
+		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
+
+		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(numOfTaskManagers)
+			.setNumSlotsPerTaskManager(parallelism / numOfTaskManagers)
+			.setConfiguration(configuration)
+			.build();
+
+		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
 
 Review comment:
   I think it would be a good idea to separate tests which modify the `MiniCluster` from those which don't. For the latter, the `MiniCluster` could be started for the test class instead for every test. This will speed up the execution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services