You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/15 07:46:00 UTC

[jira] [Commented] (FLINK-10549) Remove Legacy* Tests based on legacy mode

    [ https://issues.apache.org/jira/browse/FLINK-10549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649851#comment-16649851 ] 

ASF GitHub Bot commented on FLINK-10549:
----------------------------------------

zentol closed pull request #6842: [FLINK-10549] [test] Remove Legacy* Tests based on legacy mode
URL: https://github.com/apache/flink/pull/6842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
deleted file mode 100644
index 1dd56a775aa..00000000000
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro;
-
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-
-/**
- * IT case for the {@link AvroExternalJarProgram}.
- */
-public class LegacyAvroExternalJarProgramITCase extends TestLogger {
-
-	private static final String JAR_FILE = "maven-test-jar.jar";
-
-	private static final String TEST_DATA_FILE = "/testdata.avro";
-
-	@Test
-	public void testExternalProgram() {
-
-		LocalFlinkMiniCluster testMiniCluster = null;
-
-		try {
-			int parallelism = 4;
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-			testMiniCluster = new LocalFlinkMiniCluster(config, false);
-			testMiniCluster.start();
-
-			String jarFile = JAR_FILE;
-			String testData = getClass().getResource(TEST_DATA_FILE).toString();
-
-			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
-
-			TestEnvironment.setAsContext(
-				testMiniCluster,
-				parallelism,
-				Collections.singleton(new Path(jarFile)),
-				Collections.<URL>emptyList());
-
-			config.setString(JobManagerOptions.ADDRESS, "localhost");
-			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
-
-			program.invokeInteractiveModeForExecution();
-		}
-		catch (Throwable t) {
-			System.err.println(t.getMessage());
-			t.printStackTrace();
-			Assert.fail("Error during the packaged program execution: " + t.getMessage());
-		}
-		finally {
-			TestEnvironment.unsetAsContext();
-
-			if (testMiniCluster != null) {
-				try {
-					testMiniCluster.stop();
-				} catch (Throwable t) {
-					// ignore
-				}
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
deleted file mode 100644
index b83067c11f4..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Test for consuming a pipelined result only partially.
- */
-public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
-
-	// Test configuration
-	private static final int NUMBER_OF_TMS = 1;
-	private static final int NUMBER_OF_SLOTS_PER_TM = 1;
-	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
-
-	private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
-
-	private static TestingCluster flink;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
-		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
-
-		flink = new TestingCluster(config, true);
-
-		flink.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		flink.stop();
-	}
-
-	/**
-	 * Tests a fix for FLINK-1930.
-	 *
-	 * <p>When consuming a pipelined result only partially, is is possible that local channels
-	 * release the buffer pool, which is associated with the result partition, too early.  If the
-	 * producer is still producing data when this happens, it runs into an IllegalStateException,
-	 * because of the destroyed buffer pool.
-	 *
-	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-1930">FLINK-1930</a>
-	 */
-	@Test
-	public void testPartialConsumePipelinedResultReceiver() throws Exception {
-		final JobVertex sender = new JobVertex("Sender");
-		sender.setInvokableClass(SlowBufferSender.class);
-		sender.setParallelism(PARALLELISM);
-
-		final JobVertex receiver = new JobVertex("Receiver");
-		receiver.setInvokableClass(SingleBufferReceiver.class);
-		receiver.setParallelism(PARALLELISM);
-
-		// The partition needs to be pipelined, otherwise the original issue does not occur, because
-		// the sender and receiver are not online at the same time.
-		receiver.connectNewDataSetAsInput(
-			sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
-		final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
-
-		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
-			sender.getID(), receiver.getID());
-
-		sender.setSlotSharingGroup(slotSharingGroup);
-		receiver.setSlotSharingGroup(slotSharingGroup);
-
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	/**
-	 * Sends a fixed number of buffers and sleeps in-between sends.
-	 */
-	public static class SlowBufferSender extends AbstractInvokable {
-
-		public SlowBufferSender(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
-
-			for (int i = 0; i < 8; i++) {
-				final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
-				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
-				Thread.sleep(50);
-				bufferBuilder.finish();
-			}
-		}
-	}
-
-	/**
-	 * Reads a single buffer and recycles it.
-	 */
-	public static class SingleBufferReceiver extends AbstractInvokable {
-
-		public SingleBufferReceiver(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			InputGate gate = getEnvironment().getInputGate(0);
-			Buffer buffer = gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer();
-			if (buffer != null) {
-				buffer.recycleBuffer();
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
deleted file mode 100644
index 846901af796..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
-
-public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
-
-	private static final int NUMBER_OF_TMS = 2;
-	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
-	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
-
-	private static TestingCluster flink;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		flink = TestingUtils.startTestingCluster(
-				NUMBER_OF_SLOTS_PER_TM,
-				NUMBER_OF_TMS,
-				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		flink.stop();
-	}
-
-	/**
-	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
-	 * result.
-	 *
-	 * <pre>
-	 *                             +----------+
-	 *            +-- pipelined -> | Receiver |
-	 * +--------+ |                +----------+
-	 * | Sender |-|
-	 * +--------+ |                +----------+
-	 *            +-- blocking --> | Receiver |
-	 *                             +----------+
-	 * </pre>
-	 *
-	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
-	 * one after all subtasks are finished.
-	 */
-	@Test
-	public void testMixedPipelinedAndBlockingResults() throws Exception {
-		final JobVertex sender = new JobVertex("Sender");
-		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
-		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
-		sender.setParallelism(PARALLELISM);
-
-		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
-		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
-		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
-		pipelinedReceiver.setParallelism(PARALLELISM);
-
-		pipelinedReceiver.connectNewDataSetAsInput(
-				sender,
-				DistributionPattern.ALL_TO_ALL,
-				ResultPartitionType.PIPELINED);
-
-		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
-		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
-		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
-		blockingReceiver.setParallelism(PARALLELISM);
-
-		blockingReceiver.connectNewDataSetAsInput(sender,
-				DistributionPattern.ALL_TO_ALL,
-				ResultPartitionType.BLOCKING);
-
-		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
-				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
-
-		sender.setSlotSharingGroup(slotSharingGroup);
-		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
-		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
-
-		final JobGraph jobGraph = new JobGraph(
-				"Mixed pipelined and blocking result",
-				sender,
-				pipelinedReceiver,
-				blockingReceiver);
-
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable {
-
-		public static final String CONFIG_KEY = "number-of-times-to-send";
-
-		public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2);
-
-			// The order of intermediate result creation in the job graph specifies which produced
-			// result partition is pipelined/blocking.
-			final RecordWriter<IntValue> pipelinedWriter =
-					new RecordWriter<>(getEnvironment().getWriter(0));
-
-			final RecordWriter<IntValue> blockingWriter =
-					new RecordWriter<>(getEnvironment().getWriter(1));
-
-			writers.add(pipelinedWriter);
-			writers.add(blockingWriter);
-
-			final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
-
-			final IntValue subtaskIndex = new IntValue(
-					getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-
-			// Produce the first intermediate result and then the second in a serial fashion.
-			for (RecordWriter<IntValue> writer : writers) {
-				try {
-					for (int i = 0; i < numberOfTimesToSend; i++) {
-						writer.emit(subtaskIndex);
-					}
-					writer.flushAll();
-				}
-				finally {
-					writer.clearBuffers();
-				}
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
deleted file mode 100644
index 356d94a3815..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacySlotCountExceedingParallelismTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.reader.RecordReader;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.BitSet;
-
-public class LegacySlotCountExceedingParallelismTest extends TestLogger {
-
-	// Test configuration
-	private static final int NUMBER_OF_TMS = 2;
-	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
-	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
-
-	public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)";
-
-	private static TestingCluster flink;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		flink = TestingUtils.startTestingCluster(
-				NUMBER_OF_SLOTS_PER_TM,
-				NUMBER_OF_TMS,
-				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (flink != null) {
-			flink.stop();
-		}
-	}
-
-	@Test
-	public void testNoSlotSharingAndBlockingResultSender() throws Exception {
-		// Sender with higher parallelism than available slots
-		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM);
-		submitJobGraphAndWait(jobGraph);
-	}
-
-	@Test
-	public void testNoSlotSharingAndBlockingResultReceiver() throws Exception {
-		// Receiver with higher parallelism than available slots
-		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM, PARALLELISM * 2);
-		submitJobGraphAndWait(jobGraph);
-	}
-
-	@Test
-	public void testNoSlotSharingAndBlockingResultBoth() throws Exception {
-		// Both sender and receiver with higher parallelism than available slots
-		JobGraph jobGraph = createTestJobGraph(JOB_NAME, PARALLELISM * 2, PARALLELISM * 2);
-		submitJobGraphAndWait(jobGraph);
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
-		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
-	}
-
-	private JobGraph createTestJobGraph(
-			String jobName,
-			int senderParallelism,
-			int receiverParallelism) {
-
-		// The sender and receiver invokable logic ensure that each subtask gets the expected data
-		final JobVertex sender = new JobVertex("Sender");
-		sender.setInvokableClass(RoundRobinSubtaskIndexSender.class);
-		sender.getConfiguration().setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism);
-		sender.setParallelism(senderParallelism);
-
-		final JobVertex receiver = new JobVertex("Receiver");
-		receiver.setInvokableClass(SubtaskIndexReceiver.class);
-		receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism);
-		receiver.setParallelism(receiverParallelism);
-
-		receiver.connectNewDataSetAsInput(
-				sender,
-				DistributionPattern.ALL_TO_ALL,
-				ResultPartitionType.BLOCKING);
-
-		final JobGraph jobGraph = new JobGraph(jobName, sender, receiver);
-
-		// We need to allow queued scheduling, because there are not enough slots available
-		// to run all tasks at once. We queue tasks and then let them finish/consume the blocking
-		// result one after the other.
-		jobGraph.setAllowQueuedScheduling(true);
-
-		return jobGraph;
-	}
-
-	/**
-	 * Sends the subtask index a configurable number of times in a round-robin fashion.
-	 */
-	public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {
-
-		public static final String CONFIG_KEY = "number-of-times-to-send";
-
-		public RoundRobinSubtaskIndexSender(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));
-			final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
-
-			final IntValue subtaskIndex = new IntValue(
-					getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-
-			try {
-				for (int i = 0; i < numberOfTimesToSend; i++) {
-					writer.emit(subtaskIndex);
-				}
-				writer.flushAll();
-			}
-			finally {
-				writer.clearBuffers();
-			}
-		}
-	}
-
-	/**
-	 * Expects to receive the subtask index from a configurable number of sender tasks.
-	 */
-	public static class SubtaskIndexReceiver extends AbstractInvokable {
-
-		public static final String CONFIG_KEY = "number-of-indexes-to-receive";
-
-		public SubtaskIndexReceiver(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			RecordReader<IntValue> reader = new RecordReader<>(
-					getEnvironment().getInputGate(0),
-					IntValue.class,
-					getEnvironment().getTaskManagerInfo().getTmpDirectories());
-
-			try {
-				final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
-				final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
-
-				IntValue record;
-
-				int numberOfReceivedSubtaskIndexes = 0;
-
-				while ((record = reader.next()) != null) {
-					// Check that we don't receive more than expected
-					numberOfReceivedSubtaskIndexes++;
-
-					if (numberOfReceivedSubtaskIndexes > numberOfSubtaskIndexesToReceive) {
-						throw new IllegalStateException("Received more records than expected.");
-					}
-
-					int subtaskIndex = record.getValue();
-
-					// Check that we only receive each subtask index once
-					if (receivedSubtaskIndexes.get(subtaskIndex)) {
-						throw new IllegalStateException("Received expected subtask index twice.");
-					}
-					else {
-						receivedSubtaskIndexes.set(subtaskIndex, true);
-					}
-				}
-
-				// Check that we have received all expected subtask indexes
-				if (receivedSubtaskIndexes.cardinality() != numberOfSubtaskIndexesToReceive) {
-					throw new IllegalStateException("Finished receive, but did not receive "
-							+ "all expected subtask indexes.");
-				}
-			}
-			finally {
-				reader.clearBuffers();
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
deleted file mode 100644
index f1f5e45d369..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class LegacyTaskCancelAsyncProducerConsumerITCase extends TestLogger {
-
-	// The Exceptions thrown by the producer/consumer Threads
-	private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
-	private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
-
-	// The Threads producing/consuming the intermediate stream
-	private static volatile Thread ASYNC_PRODUCER_THREAD;
-	private static volatile Thread ASYNC_CONSUMER_THREAD;
-
-	/**
-	 * Tests that a task waiting on an async producer/consumer that is stuck
-	 * in a blocking buffer request can be properly cancelled.
-	 *
-	 * <p>This is currently required for the Flink Kafka sources, which spawn
-	 * a separate Thread consuming from Kafka and producing the intermediate
-	 * streams in the spawned Thread instead of the main task Thread.
-	 */
-	@Test
-	public void testCancelAsyncProducerAndConsumer() throws Exception {
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
-		TestingCluster flink = null;
-
-		try {
-			// Cluster
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
-
-			flink = new TestingCluster(config, true);
-			flink.start();
-
-			// Job with async producer and consumer
-			JobVertex producer = new JobVertex("AsyncProducer");
-			producer.setParallelism(1);
-			producer.setInvokableClass(AsyncProducer.class);
-
-			JobVertex consumer = new JobVertex("AsyncConsumer");
-			consumer.setParallelism(1);
-			consumer.setInvokableClass(AsyncConsumer.class);
-			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-
-			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
-			producer.setSlotSharingGroup(slot);
-			consumer.setSlotSharingGroup(slot);
-
-			JobGraph jobGraph = new JobGraph(producer, consumer);
-
-			// Submit job and wait until running
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-			flink.submitJobDetached(jobGraph);
-
-			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(runningFuture, deadline.timeLeft());
-
-			// Wait for blocking requests, cancel and wait for cancellation
-			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
-			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
-
-			boolean producerBlocked = false;
-			for (int i = 0; i < 50; i++) {
-				Thread thread = ASYNC_PRODUCER_THREAD;
-
-				if (thread != null && thread.isAlive()) {
-					StackTraceElement[] stackTrace = thread.getStackTrace();
-					producerBlocked = isInBlockingBufferRequest(stackTrace);
-				}
-
-				if (producerBlocked) {
-					break;
-				} else {
-					// Retry
-					Thread.sleep(500L);
-				}
-			}
-
-			// Verify that async producer is in blocking request
-			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
-
-			boolean consumerWaiting = false;
-			for (int i = 0; i < 50; i++) {
-				Thread thread = ASYNC_CONSUMER_THREAD;
-
-				if (thread != null && thread.isAlive()) {
-					consumerWaiting = thread.getState() == Thread.State.WAITING;
-				}
-
-				if (consumerWaiting) {
-					break;
-				} else {
-					// Retry
-					Thread.sleep(500L);
-				}
-			}
-
-			// Verify that async consumer is in blocking request
-			assertTrue("Consumer thread is not blocked.", consumerWaiting);
-
-			msg = new CancelJob(jobGraph.getJobID());
-			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(cancelFuture, deadline.timeLeft());
-
-			Await.ready(cancelledFuture, deadline.timeLeft());
-
-			// Verify the expected Exceptions
-			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
-			assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
-
-			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
-			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
-		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
-		}
-	}
-
-	/**
-	 * Invokable emitting records in a separate Thread (not the main Task
-	 * thread).
-	 */
-	public static class AsyncProducer extends AbstractInvokable {
-
-		public AsyncProducer(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			Thread producer = new ProducerThread(getEnvironment().getWriter(0));
-
-			// Publish the async producer for the main test Thread
-			ASYNC_PRODUCER_THREAD = producer;
-
-			producer.start();
-
-			// Wait for the producer Thread to finish. This is executed in the
-			// main Task thread and will be interrupted on cancellation.
-			while (producer.isAlive()) {
-				try {
-					producer.join();
-				} catch (InterruptedException ignored) {
-				}
-			}
-		}
-
-		/**
-		 * The Thread emitting the records.
-		 */
-		private static class ProducerThread extends Thread {
-
-			private final RecordWriter<LongValue> recordWriter;
-
-			public ProducerThread(ResultPartitionWriter partitionWriter) {
-				this.recordWriter = new RecordWriter<>(partitionWriter);
-			}
-
-			@Override
-			public void run() {
-				LongValue current = new LongValue(0);
-
-				try {
-					while (true) {
-						current.setValue(current.getValue() + 1);
-						recordWriter.emit(current);
-						recordWriter.flushAll();
-					}
-				} catch (Exception e) {
-					ASYNC_PRODUCER_EXCEPTION = e;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Invokable consuming buffers in a separate Thread (not the main Task
-	 * thread).
-	 */
-	public static class AsyncConsumer extends AbstractInvokable {
-
-		public AsyncConsumer(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0));
-
-			// Publish the async consumer for the main test Thread
-			ASYNC_CONSUMER_THREAD = consumer;
-
-			consumer.start();
-
-			// Wait for the consumer Thread to finish. This is executed in the
-			// main Task thread and will be interrupted on cancellation.
-			while (consumer.isAlive()) {
-				try {
-					consumer.join();
-				} catch (InterruptedException ignored) {
-				}
-			}
-		}
-
-		/**
-		 * The Thread consuming buffers.
-		 */
-		private static class ConsumerThread extends Thread {
-
-			private final InputGate inputGate;
-
-			public ConsumerThread(InputGate inputGate) {
-				this.inputGate = inputGate;
-			}
-
-			@Override
-			public void run() {
-				try {
-					while (true) {
-						inputGate.getNextBufferOrEvent();
-					}
-				} catch (Exception e) {
-					ASYNC_CONSUMER_EXCEPTION = e;
-				}
-			}
-		}
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
deleted file mode 100644
index b273ade2dd2..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.accumulators;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests the availability of accumulator results during runtime. The test case tests a user-defined
- * accumulator and Flink's internal accumulators for two consecutive tasks.
- *
- * <p>CHAINED[Source -> Map] -> Sink
- *
- * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
- * the task to the task manager which notifies the job manager and sends the current accumulators.
- * The task blocks until the test has been notified about the current accumulator values.
- *
- * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
- * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
- * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
- * check for an upper bound of the elements read.
- */
-public class LegacyAccumulatorLiveITCase extends TestLogger {
-
-	private static final Logger LOG = LoggerFactory.getLogger(LegacyAccumulatorLiveITCase.class);
-
-	private static ActorSystem system;
-	private static ActorGateway jobManagerGateway;
-	private static ActorRef taskManager;
-
-	private static JobID jobID;
-	private static JobGraph jobGraph;
-
-	// name of user accumulator
-	private static final String ACCUMULATOR_NAME = "test";
-
-	// number of heartbeat intervals to check
-	private static final int NUM_ITERATIONS = 5;
-
-	private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
-
-	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
-	@Before
-	public void before() throws Exception {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		TestingCluster testingCluster = new TestingCluster(config, false, true);
-		testingCluster.start();
-
-		jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-		taskManager = testingCluster.getTaskManagersAsJava().get(0);
-
-		// generate test data
-		for (int i = 0; i < NUM_ITERATIONS; i++) {
-			inputData.add(i, String.valueOf(i + 1));
-		}
-
-		NotifyingMapper.finished = false;
-	}
-
-	@After
-	public void after() throws Exception {
-		JavaTestKit.shutdownActorSystem(system);
-		inputData.clear();
-	}
-
-	@Test
-	public void testBatch() throws Exception {
-
-		/** The program **/
-		ExecutionEnvironment env = new BatchPlanExtractor();
-		env.setParallelism(1);
-
-		DataSet<String> input = env.fromCollection(inputData);
-		input
-				.flatMap(new NotifyingMapper())
-				.output(new NotifyingOutputFormat());
-
-		env.execute();
-
-		// Extract job graph and set job id for the task to notify of accumulator changes.
-		jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
-		jobID = jobGraph.getJobID();
-
-		verifyResults();
-	}
-
-	@Test
-	public void testStreaming() throws Exception {
-
-		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<String> input = env.fromCollection(inputData);
-		input
-				.flatMap(new NotifyingMapper())
-				.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
-
-		jobGraph = env.getStreamGraph().getJobGraph();
-		jobID = jobGraph.getJobID();
-
-		verifyResults();
-	}
-
-	private static void verifyResults() {
-		new JavaTestKit(system) {{
-
-			ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
-
-			// register for accumulator changes
-			jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
-			expectMsgEquals(TIMEOUT, true);
-
-			// submit job
-
-			jobManagerGateway.tell(
-					new JobManagerMessages.SubmitJob(
-							jobGraph,
-							ListeningBehaviour.EXECUTION_RESULT),
-					selfGateway);
-			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
-
-			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-			Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators = msg.userAccumulators();
-
-			ExecutionAttemptID mapperTaskID = null;
-
-			ExecutionAttemptID sinkTaskID = null;
-
-			/* Check for accumulator values */
-			if (checkUserAccumulators(0, userAccumulators)) {
-				LOG.info("Passed initial check for map task.");
-			} else {
-				fail("Wrong accumulator results when map task begins execution.");
-			}
-
-			int expectedAccVal = 0;
-
-			/* for mapper task */
-			for (int i = 1; i <= NUM_ITERATIONS; i++) {
-				expectedAccVal += i;
-
-				// receive message
-				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-				userAccumulators = msg.userAccumulators();
-
-				LOG.info("{}", userAccumulators);
-
-				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					LOG.info("Passed round #" + i);
-				} else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					// we determined the wrong task id and need to switch the two here
-					ExecutionAttemptID temp = mapperTaskID;
-					mapperTaskID = sinkTaskID;
-					sinkTaskID = temp;
-					LOG.info("Passed round #" + i);
-				} else {
-					fail("Failed in round #" + i);
-				}
-			}
-
-			msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-			userAccumulators = msg.userAccumulators();
-
-			if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-				LOG.info("Passed initial check for sink task.");
-			} else {
-				fail("Wrong accumulator results when sink task begins execution.");
-			}
-
-			/* for sink task */
-			for (int i = 1; i <= NUM_ITERATIONS; i++) {
-
-				// receive message
-				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-
-				userAccumulators = msg.userAccumulators();
-
-				LOG.info("{}", userAccumulators);
-
-				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					LOG.info("Passed round #" + i);
-				} else {
-					fail("Failed in round #" + i);
-				}
-			}
-
-			expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
-
-		}};
-	}
-
-	private static boolean checkUserAccumulators(int expected, Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap) {
-		LOG.info("checking user accumulators");
-		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME).getUnchecked()).getLocalValue();
-	}
-
-	/**
-	 * UDF that notifies when it changes the accumulator values.
-	 */
-	private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		private IntCounter counter = new IntCounter();
-
-		private static boolean finished = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
-			notifyTaskManagerOfAccumulatorUpdate();
-		}
-
-		@Override
-		public void flatMap(String value, Collector<Integer> out) throws Exception {
-			int val = Integer.valueOf(value);
-			counter.add(val);
-			out.collect(val);
-			LOG.debug("Emitting value {}.", value);
-			notifyTaskManagerOfAccumulatorUpdate();
-		}
-
-		@Override
-		public void close() throws Exception {
-			finished = true;
-		}
-	}
-
-	/**
-	 * Outputs format which notifies of accumulator changes and waits for the previous mapper.
-	 */
-	private static class NotifyingOutputFormat implements OutputFormat<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {
-		}
-
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-			while (!NotifyingMapper.finished) {
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {}
-			}
-			notifyTaskManagerOfAccumulatorUpdate();
-		}
-
-		@Override
-		public void writeRecord(Integer record) throws IOException {
-			notifyTaskManagerOfAccumulatorUpdate();
-		}
-
-		@Override
-		public void close() throws IOException {
-		}
-	}
-
-	/**
-	 * Notify task manager of accumulator update and wait until the Heartbeat containing the message
-	 * has been reported.
-	 */
-	public static void notifyTaskManagerOfAccumulatorUpdate() {
-		new JavaTestKit(system) {{
-			Timeout timeout = new Timeout(TIMEOUT);
-			Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
-			try {
-				Await.result(ask, timeout.duration());
-			} catch (Exception e) {
-				fail("Failed to notify task manager of accumulator update.");
-			}
-		}};
-	}
-
-	/**
-	 * Helpers to generate the JobGraph.
-	 */
-	private static JobGraph getOptimizedPlan(Plan plan) {
-		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
-		JobGraphGenerator jgg = new JobGraphGenerator();
-		OptimizedPlan op = pc.compile(plan);
-		return jgg.compileJobGraph(op);
-	}
-
-	private static class BatchPlanExtractor extends LocalEnvironment {
-
-		private Plan plan = null;
-
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			plan = createProgramPlan();
-			return new JobExecutionResult(new JobID(), -1, null);
-		}
-	}
-
-	/**
-	 * This is used to for creating the example topology. {@link #execute} is never called, we
-	 * only use this to call {@link #getStreamGraph()}.
-	 */
-	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
-
-		@Override
-		public JobExecutionResult execute() throws Exception {
-			return execute("default");
-		}
-
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			throw new RuntimeException("This should not be called.");
-		}
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
deleted file mode 100644
index fa114e1029a..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.classloading;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasProperty;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test job classloader.
- */
-public class LegacyClassLoaderITCase extends TestLogger {
-
-	private static final Logger LOG = LoggerFactory.getLogger(LegacyClassLoaderITCase.class);
-
-	private static final String INPUT_SPLITS_PROG_JAR_FILE = "customsplit-test-jar.jar";
-
-	private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "streaming-customsplit-test-jar.jar";
-
-	private static final String STREAMING_PROG_JAR_FILE = "streamingclassloader-test-jar.jar";
-
-	private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "streaming-checkpointed-classloader-test-jar.jar";
-
-	private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
-
-	private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
-
-	private static final String CUSTOM_KV_STATE_JAR_PATH = "custom_kv_state-test-jar.jar";
-
-	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
-
-	@ClassRule
-	public static final TemporaryFolder FOLDER = new TemporaryFolder();
-
-	@Rule
-	public ExpectedException expectedException = ExpectedException.none();
-
-	private static TestingCluster testCluster;
-
-	private static int parallelism;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		parallelism = 4;
-
-		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
-		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
-		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
-				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
-
-		// Savepoint path
-		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
-				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
-
-		testCluster = new TestingCluster(config, false);
-		testCluster.start();
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (testCluster != null) {
-			testCluster.stop();
-		}
-
-		TestStreamEnvironment.unsetAsContext();
-		TestEnvironment.unsetAsContext();
-	}
-
-	@Test
-	public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
-
-		PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
-
-		TestEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
-			Collections.<URL>emptyList());
-
-		inputSplitTestProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
-
-		TestStreamEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
-			Collections.<URL>emptyList());
-
-		streamingInputSplitTestProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
-		URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
-		PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
-
-		TestEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.<Path>emptyList(),
-			Collections.singleton(classpath));
-
-		inputSplitTestProg2.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		// regular streaming job
-		PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
-
-		TestStreamEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
-			Collections.<URL>emptyList());
-
-		streamingProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
-		// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
-		PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
-
-		TestStreamEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
-			Collections.<URL>emptyList());
-
-		// Program should terminate with a 'SuccessException':
-		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
-		expectedException.expectCause(
-			Matchers.<Throwable>hasProperty("cause",
-				hasProperty("class",
-					hasProperty("canonicalName", equalTo(
-						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
-
-		streamingCheckpointedProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		PackagedProgram kMeansProg = new PackagedProgram(
-			new File(KMEANS_JAR_PATH),
-			new String[] {
-				KMeansData.DATAPOINTS,
-				KMeansData.INITIAL_CENTERS,
-				"25"
-			});
-
-		TestEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(KMEANS_JAR_PATH)),
-			Collections.<URL>emptyList());
-
-		kMeansProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		int port = testCluster.getLeaderRPCPort();
-
-		// test FLINK-3633
-		final PackagedProgram userCodeTypeProg = new PackagedProgram(
-			new File(USERCODETYPE_JAR_PATH),
-			new String[] { USERCODETYPE_JAR_PATH,
-				"localhost",
-				String.valueOf(port),
-			});
-
-		TestEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
-			Collections.<URL>emptyList());
-
-		userCodeTypeProg.invokeInteractiveModeForExecution();
-	}
-
-	@Test
-	public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		File checkpointDir = FOLDER.newFolder();
-		File outputDir = FOLDER.newFolder();
-
-		final PackagedProgram program = new PackagedProgram(
-			new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
-			new String[] {
-				checkpointDir.toURI().toString(),
-				outputDir.toURI().toString()
-			});
-
-		TestStreamEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
-			Collections.<URL>emptyList());
-
-		expectedException.expectCause(
-			Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
-
-		program.invokeInteractiveModeForExecution();
-	}
-
-	/**
-	 * Tests disposal of a savepoint, which contains custom user code KvState.
-	 */
-	@Test
-	public void testDisposeSavepointWithCustomKvState() throws Exception {
-		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
-
-		File checkpointDir = FOLDER.newFolder();
-		File outputDir = FOLDER.newFolder();
-
-		final PackagedProgram program = new PackagedProgram(
-				new File(CUSTOM_KV_STATE_JAR_PATH),
-				new String[] {
-						String.valueOf(parallelism),
-						checkpointDir.toURI().toString(),
-						"5000",
-						outputDir.toURI().toString()
-				});
-
-		TestStreamEnvironment.setAsContext(
-			testCluster,
-			parallelism,
-			Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
-			Collections.<URL>emptyList()
-		);
-
-		// Execute detached
-		Thread invokeThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					program.invokeInteractiveModeForExecution();
-				} catch (ProgramInvocationException ignored) {
-					if (ignored.getCause() == null ||
-						!(ignored.getCause() instanceof JobCancellationException)) {
-						ignored.printStackTrace();
-					}
-				}
-			}
-		});
-
-		LOG.info("Starting program invoke thread");
-		invokeThread.start();
-
-		// The job ID
-		JobID jobId = null;
-
-		ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
-
-		LOG.info("Waiting for job status running.");
-
-		// Wait for running job
-		while (jobId == null && deadline.hasTimeLeft()) {
-			Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
-			RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
-
-			for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
-				jobId = runningJob.getJobId();
-				LOG.info("Job running. ID: " + jobId);
-				break;
-			}
-
-			// Retry if job is not available yet
-			if (jobId == null) {
-				Thread.sleep(100L);
-			}
-		}
-
-		LOG.info("Wait for all tasks to be running.");
-		Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
-		Await.ready(allRunning, deadline.timeLeft());
-		LOG.info("All tasks are running.");
-
-		// Trigger savepoint
-		String savepointPath = null;
-		for (int i = 0; i < 20; i++) {
-			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
-			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
-			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
-
-			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
-				savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
-				LOG.info("Triggered savepoint. Path: " + savepointPath);
-			} else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
-				Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
-				LOG.info("Failed to trigger savepoint. Retrying...", cause);
-				// This can fail if the operators are not opened yet
-				Thread.sleep(500);
-			} else {
-				throw new IllegalStateException("Unexpected response to TriggerSavepoint");
-			}
-		}
-
-		assertNotNull("Failed to trigger savepoint", savepointPath);
-
-		// Dispose savepoint
-		LOG.info("Disposing savepoint at " + savepointPath);
-		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
-		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
-
-		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
-			// Success :-)
-			LOG.info("Disposed savepoint at " + savepointPath);
-		} else if (disposeResponse instanceof DisposeSavepointFailure) {
-			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
-		} else {
-			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
-		}
-
-		// Cancel job, wait for success
-		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
-		Object response = Await.result(cancelFuture, deadline.timeLeft());
-		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
-
-		// make sure, the execution is finished to not influence other test methods
-		invokeThread.join(deadline.timeLeft().toMillis());
-		assertFalse("Program invoke thread still running", invokeThread.isAlive());
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
deleted file mode 100644
index 174c90ee042..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests retrieval of a job from a running Flink cluster.
- */
-public class LegacyJobRetrievalITCase extends TestLogger {
-
-	private static final Semaphore lock = new Semaphore(1);
-
-	private static FlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void before() {
-		Configuration configuration = new Configuration();
-		cluster = new TestingCluster(configuration, false);
-		cluster.start();
-	}
-
-	@AfterClass
-	public static void after() {
-		cluster.stop();
-		cluster = null;
-	}
-
-	@Test
-	public void testJobRetrieval() throws Exception {
-		final JobID jobID = new JobID();
-
-		final JobVertex imalock = new JobVertex("imalock");
-		imalock.setInvokableClass(SemaphoreInvokable.class);
-
-		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
-
-		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
-
-		// acquire the lock to make sure that the job cannot complete until the job client
-		// has been attached in resumingThread
-		lock.acquire();
-		client.runDetached(jobGraph, LegacyJobRetrievalITCase.class.getClassLoader());
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final Thread resumingThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					assertNotNull(client.retrieveJob(jobID));
-				} catch (Throwable e) {
-					error.set(e);
-				}
-			}
-		}, "Flink-Job-Retriever");
-
-		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
-		final ActorSystem actorSystem = actorSystemSeq.last();
-		JavaTestKit testkit = new JavaTestKit(actorSystem);
-
-		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
-		// wait until client connects
-		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
-		// confirm registration
-		testkit.expectMsgEquals(true);
-
-		// kick off resuming
-		resumingThread.start();
-
-		// wait for client to connect
-		testkit.expectMsgAllOf(
-			TestingJobManagerMessages.getClientConnected(),
-			TestingJobManagerMessages.getClassLoadingPropsDelivered());
-
-		// client has connected, we can release the lock
-		lock.release();
-
-		resumingThread.join();
-
-		Throwable exception = error.get();
-		if (exception != null) {
-			throw new AssertionError(exception);
-		}
-	}
-
-	@Test
-	public void testNonExistingJobRetrieval() throws Exception {
-		final JobID jobID = new JobID();
-		ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
-
-		try {
-			client.retrieveJob(jobID);
-			fail();
-		} catch (JobRetrievalException ignored) {
-			// this is what we want
-		}
-	}
-
-	/**
-	 * Invokable that waits on {@link #lock} to be released and finishes afterwards.
-	 *
-	 * <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
-	 */
-	public static class SemaphoreInvokable extends AbstractInvokable {
-
-		public SemaphoreInvokable(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			lock.acquire();
-			lock.release();
-		}
-	}
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Remove Legacy* Tests based on legacy mode
> -----------------------------------------
>
>                 Key: FLINK-10549
>                 URL: https://issues.apache.org/jira/browse/FLINK-10549
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.7.0
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> This Jira tracks the removal of tests based on legacy mode and starting with "LegacyXXX" while covered by a test named "XXX" .
>  # We already have {{JobRetrievalITCase}} and all test cases of {{LegacyJobRetrievalITCase}} are covered. Just simply remove it.
>  # We already have {{AccumulatorLiveITCase}} and all test cases of {{LegacyAccumulatorLiveITCase}} are covered. Just simply remove it.
>  # We already have {{TaskCancelAsyncProducerConsumerITCase}} and all test cases of {{LegacyTaskCancelAsyncProducerConsumerITCase}} are covered. Just simply remove it.
>  # We already have {{ClassLoaderITCase}} and all test cases of {{LegacyClassLoaderITCase}} are covered. Just simply remove it.
>  # We already have {{SlotCountExceedingParallelismTest}} and all test cases of {{LegacySlotCountExceedingParallelismTest}} are covered. Just simply remove it.
>  # We already have {{ScheduleOrUpdateConsumersTest}} and all test cases of {{LegacyScheduleOrUpdateConsumersTest}} are covered. Just simply remove it.
>  # We already have {{PartialConsumePipelinedResultTest}} and all test cases of {{LegacyPartialConsumePipelinedResultTest}} are covered. Just simply remove it.
>  # We already have {{AvroExternalJarProgramITCase}} and all test cases of {{LegacyAvroExternalJarProgramITCase}} are covered. Just simply remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)