You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:06 UTC
[42/63] [abbrv] Adjust ExecutionGraph state machine to TaskManager's
failing model (direct transitions to canceled)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index fd9f10d..e8f8b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRespon
import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
@@ -218,7 +219,10 @@ public class TaskManager implements TaskOperationProtocol {
// Start local RPC server, give it the number of threads as we have slots
try {
- this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numberOfSlots);
+ // some magic number for the handler threads
+ final int numHandlers = Math.min(numberOfSlots, 2*Hardware.getNumberCPUCores());
+
+ this.taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostAddress(), ipcPort, numHandlers);
this.taskManagerServer.start();
} catch (IOException e) {
LOG.error("Failed to start TaskManager server. " + e.getMessage(), e);
@@ -396,6 +400,8 @@ public class TaskManager implements TaskOperationProtocol {
LOG.info("Shutting down TaskManager");
+ cancelAndClearEverything();
+
// first, stop the heartbeat thread and wait for it to terminate
this.heartbeatThread.interrupt();
try {
@@ -540,7 +546,13 @@ public class TaskManager implements TaskOperationProtocol {
final int taskIndex = tdd.getIndexInSubtaskGroup();
final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
+ boolean jarsRegistered = false;
+
try {
+ // library and classloader issues first
+ LibraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
+ jarsRegistered = true;
+
final ClassLoader userCodeClassLoader = LibraryCacheManager.getClassLoader(jobID);
if (userCodeClassLoader == null) {
throw new Exception("No user code ClassLoader available.");
@@ -578,11 +590,17 @@ public class TaskManager implements TaskOperationProtocol {
cpTasks.put(e.getKey(), cp);
}
env.addCopyTasksForCacheFile(cpTasks);
-
+
if (!task.startExecution()) {
throw new Exception("Cannot start task. Task was canceled or failed.");
}
+ // final check that we can go (we do this after the registration, so the the "happen's before"
+ // relationship ensures that either the shutdown removes this task, or we are aware of the shutdown
+ if (shutdownStarted.get()) {
+ throw new Exception("Task Manager is shut down.");
+ }
+
success = true;
return new TaskOperationResult(executionId, true);
}
@@ -590,6 +608,7 @@ public class TaskManager implements TaskOperationProtocol {
if (!success) {
// remove task
this.runningTasks.remove(executionId);
+
// delete distributed cache files
for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration())) {
this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID);
@@ -600,11 +619,13 @@ public class TaskManager implements TaskOperationProtocol {
catch (Throwable t) {
LOG.error("Could not instantiate task", t);
- try {
- LibraryCacheManager.unregister(jobID);
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+ if (jarsRegistered) {
+ try {
+ LibraryCacheManager.unregister(jobID);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
+ }
}
}
@@ -623,7 +644,9 @@ public class TaskManager implements TaskOperationProtocol {
// Task de-registration must be atomic
final Task task = this.runningTasks.remove(executionId);
if (task == null) {
- LOG.error("Cannot find task with ID " + executionId + " to unregister");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot find task with ID " + executionId + " to unregister");
+ }
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 91d9973..8e276e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -58,7 +58,9 @@ public class ExecutorThreadFactory implements ThreadFactory {
@Override
public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index ac76623..41ca7da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
@@ -257,7 +257,7 @@ public class ExecutionGraphDeploymentTest {
}
});
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
for (int i = 0; i < dop1 + dop2; i++) {
scheduler.newInstanceAvailable(getInstance(taskManager));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index ce1ab30..a351209 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
@@ -417,6 +417,45 @@ public class ExecutionVertexCancelTest {
}
}
+ @Test
+ public void testSendCancelAndReceiveFail() {
+
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+ final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
+
+ final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+ when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed"));
+
+ Instance instance = getInstance(taskManager);
+ AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+ setVertexState(vertex, ExecutionState.RUNNING);
+ setVertexResource(vertex, slot);
+
+ assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+
+ vertex.cancel();
+
+ assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+
+ vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
+
+ assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
+
+ assertTrue(slot.isReleased());
+
+ assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Actions after a vertex has been canceled or while canceling
// --------------------------------------------------------------------------------------------
@@ -436,7 +475,7 @@ public class ExecutionVertexCancelTest {
// scheduling after being created should be tolerated (no exception) because
// it can occur as the result of races
{
- DefaultScheduler scheduler = mock(DefaultScheduler.class);
+ Scheduler scheduler = mock(Scheduler.class);
vertex.scheduleForExecution(scheduler, false);
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
@@ -475,7 +514,7 @@ public class ExecutionVertexCancelTest {
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
setVertexState(vertex, ExecutionState.CANCELING);
- DefaultScheduler scheduler = mock(DefaultScheduler.class);
+ Scheduler scheduler = mock(Scheduler.class);
vertex.scheduleForExecution(scheduler, false);
fail("Method should throw an exception");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 43d6547..f59b326 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
@@ -54,7 +54,7 @@ public class ExecutionVertexSchedulingTest {
final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
- DefaultScheduler scheduler = mock(DefaultScheduler.class);
+ Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
- DefaultScheduler scheduler = mock(DefaultScheduler.class);
+ Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -124,7 +124,7 @@ public class ExecutionVertexSchedulingTest {
final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
- DefaultScheduler scheduler = mock(DefaultScheduler.class);
+ Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index c7e3463..8e87e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -32,35 +32,51 @@ import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.util.StringUtils;
+
import org.junit.Test;
/**
* This test is intended to cover the basic functionality of the {@link JobManager}.
*/
public class JobManagerITCase {
-
+
@Test
- public void testSingleVertexJob() {
+ public void testSingleVertexJobImmediately() {
+
+ final int NUM_TASKS = 133;
+
try {
final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
- vertex.setParallelism(3);
+ vertex.setParallelism(NUM_TASKS);
vertex.setInvokableClass(NoOpInvokable.class);
final JobGraph jobGraph = new JobGraph("Test Job", vertex);
- JobManager jm = startJobManager(3);
+ JobManager jm = startJobManager(NUM_TASKS);
try {
+ assertEquals(NUM_TASKS, jm.getAvailableSlots());
+
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -77,7 +93,6 @@ public class JobManagerITCase {
success = true;
break;
}
-
else if (state == JobStatus.FAILED || state == JobStatus.CANCELED) {
break;
}
@@ -102,6 +117,475 @@ public class JobManagerITCase {
}
}
+ @Test
+ public void testSingleVertexJobQueued() {
+
+ final int NUM_TASKS = 111;
+
+ try {
+ final AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+ vertex.setParallelism(NUM_TASKS);
+ vertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ JobManager jm = startJobManager(10);
+ try {
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testForwardJob() {
+
+ final int NUM_TASKS = 31;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(2 * NUM_TASKS);
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBipartiteJob() {
+
+ final int NUM_TASKS = 31;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(AgnosticReceiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Bipartite Job", sender, receiver);
+
+ JobManager jm = startJobManager(2 * NUM_TASKS);
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoInputJob() {
+
+ final int NUM_TASKS = 13;
+
+ try {
+ final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
+ final AbstractJobVertex sender2 = new AbstractJobVertex("Sender2");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender1.setInvokableClass(Sender.class);
+ sender2.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(AgnosticReceiver.class);
+
+ sender1.setParallelism(NUM_TASKS);
+ sender2.setParallelism(2*NUM_TASKS);
+ receiver.setParallelism(3*NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
+
+ final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
+
+ JobManager jm = startJobManager(6 * NUM_TASKS);
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJobFailingSender() {
+
+ final int NUM_TASKS = 100;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(ExceptionSender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(NUM_TASKS);
+ try {
+ assertEquals(NUM_TASKS, jm.getAvailableSlots());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJobSometimesFailingSender() {
+
+ final int NUM_TASKS = 100;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(SometimesExceptionSender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(NUM_TASKS);
+ try {
+ assertEquals(NUM_TASKS, jm.getAvailableSlots());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJobFailingReceiver() {
+
+ final int NUM_TASKS = 200;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ExceptionReceiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(2 * NUM_TASKS);
+ try {
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test failure in instantiation, where all fail by themselves
+ */
+ @Test
+ public void testJobFailingInstantiation() {
+
+ final int NUM_TASKS = 200;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(InstantiationErrorSender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(NUM_TASKS);
+ try {
+ assertEquals(NUM_TASKS, jm.getAvailableSlots());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test failure in instantiation, where some have to be canceled (not all fail by themselves)
+ */
+ @Test
+ public void testJobFailingSomeInstantiations() {
+
+ final int NUM_TASKS = 200;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(SometimesInstantiationErrorSender.class);
+ receiver.setInvokableClass(Receiver.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ JobManager jm = startJobManager(NUM_TASKS);
+ try {
+ assertEquals(NUM_TASKS, jm.getAvailableSlots());
+
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ for (Execution e : eg.getRegisteredExecutions().values()) {
+ System.out.println(e + StringUtils.arrayAwareToString(e.getStateTimestamps()));
+ }
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private static final JobManager startJobManager(int numSlots) throws Exception {
@@ -114,6 +598,17 @@ public class JobManagerITCase {
GlobalConfiguration.includeConfiguration(cfg);
JobManager jm = new JobManager(ExecutionMode.LOCAL);
+
+ // we need to wait until the taskmanager is registered
+ // max time is 5 seconds
+ long deadline = System.currentTimeMillis() + 5000;
+
+ while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+
+ assertEquals(numSlots, jm.getAvailableSlots());
+
return jm;
}
@@ -133,4 +628,169 @@ public class JobManagerITCase {
throw new IOException("could not find free port");
}
+
+ // --------------------------------------------------------------------------------------------
+ // Simple test tasks
+ // --------------------------------------------------------------------------------------------
+
+ public static final class Sender extends AbstractInvokable {
+
+ private RecordWriter<IntegerRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ writer = new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ writer.initializeSerializers();
+ writer.emit(new IntegerRecord(42));
+ writer.emit(new IntegerRecord(1337));
+ writer.flush();
+ }
+ }
+
+ public static final class Receiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ IntegerRecord i1 = reader.next();
+ IntegerRecord i2 = reader.next();
+ IntegerRecord i3 = reader.next();
+
+ if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
+ throw new Exception("Wrong Data Received");
+ }
+ }
+ }
+
+ public static final class AgnosticReceiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (reader.next() != null);
+ }
+ }
+
+ public static final class AgnosticBinaryReceiver extends AbstractInvokable {
+
+ private RecordReader<IntegerRecord> reader1;
+ private RecordReader<IntegerRecord> reader2;
+
+ @Override
+ public void registerInputOutput() {
+ reader1 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ reader2 = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (reader1.next() != null);
+ while (reader2.next() != null);
+ }
+ }
+
+ public static final class ExceptionSender extends AbstractInvokable {
+
+ private RecordWriter<IntegerRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ writer = new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ writer.initializeSerializers();
+
+ throw new Exception("Test Exception");
+ }
+ }
+
+ public static final class ExceptionReceiver extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {
+ new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ throw new Exception("Expected Test Exception");
+ }
+ }
+
+ public static final class SometimesExceptionSender extends AbstractInvokable {
+
+ private RecordWriter<IntegerRecord> writer;
+
+ @Override
+ public void registerInputOutput() {
+ writer = new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ writer.initializeSerializers();
+
+ if (Math.random() < 0.05) {
+ throw new Exception("Test Exception");
+ } else {
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ }
+
+ public static final class InstantiationErrorSender extends AbstractInvokable {
+
+ public InstantiationErrorSender() {
+ throw new RuntimeException("Test Exception in Constructior");
+ }
+
+ @Override
+ public void registerInputOutput() {}
+
+ @Override
+ public void invoke() {}
+ }
+
+ public static final class SometimesInstantiationErrorSender extends AbstractInvokable {
+
+ public SometimesInstantiationErrorSender() {
+ if (Math.random() < 0.05) {
+ throw new RuntimeException("Test Exception in Constructior");
+ }
+ }
+
+ @Override
+ public void registerInputOutput() {
+ new RecordWriter<IntegerRecord>(this);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index b092312..2892384 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
/**
- * Tests for the {@link DefaultScheduler} when scheduling individual tasks.
+ * Tests for the {@link Scheduler} when scheduling individual tasks.
*/
public class SchedulerIsolatedTasksTest {
@Test
public void testAddAndRemoveInstance() {
try {
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
@@ -111,7 +111,7 @@ public class SchedulerIsolatedTasksTest {
@Test
public void testScheduleImmediately() {
try {
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
assertEquals(0, scheduler.getNumberOfAvailableSlots());
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -181,7 +181,7 @@ public class SchedulerIsolatedTasksTest {
final int NUM_TASKS_TO_SCHEDULE = 2000;
try {
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
for (int i = 0;i < NUM_INSTANCES; i++) {
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -270,7 +270,7 @@ public class SchedulerIsolatedTasksTest {
@Test
public void testScheduleWithDyingInstances() {
try {
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
@@ -330,7 +330,7 @@ public class SchedulerIsolatedTasksTest {
@Test
public void testSchedulingLocation() {
try {
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index afc0db9..c641524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -42,7 +42,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
scheduler.newInstanceAvailable(i1);
@@ -124,7 +124,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -240,7 +240,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(2));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -346,7 +346,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 1 tasks from the first vertex group and 2 from the second
@@ -393,7 +393,7 @@ public class SchedulerSlotSharingTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(3));
scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -535,7 +535,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
@@ -583,7 +583,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- DefaultScheduler scheduler = new DefaultScheduler();
+ Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 13261d7..ba08a9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -114,11 +114,6 @@ public class TaskManagerTest {
Collections.<GateDeploymentDescriptor>emptyList(),
new String[0], 0);
- LibraryCacheManager.register(jid1, new String[0]);
- LibraryCacheManager.register(jid2, new String[0]);
- assertNotNull(LibraryCacheManager.getClassLoader(jid1));
- assertNotNull(LibraryCacheManager.getClassLoader(jid2));
-
TaskOperationResult result1 = tm.submitTask(tdd1);
TaskOperationResult result2 = tm.submitTask(tdd2);
@@ -196,10 +191,6 @@ public class TaskManagerTest {
Collections.<GateDeploymentDescriptor>emptyList(),
new String[0], 0);
- LibraryCacheManager.register(jid, new String[0]);
- LibraryCacheManager.register(jid, new String[0]);
- assertNotNull(LibraryCacheManager.getClassLoader(jid));
-
assertFalse(tm.submitTask(tdd1).isSuccess());
assertFalse(tm.submitTask(tdd2).isSuccess());
@@ -250,11 +241,6 @@ public class TaskManagerTest {
Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
new String[0], 0);
- // register the job twice (for two tasks) at the lib cache
- LibraryCacheManager.register(jid, new String[0]);
- LibraryCacheManager.register(jid, new String[0]);
- assertNotNull(LibraryCacheManager.getClassLoader(jid));
-
// deploy sender before receiver, so the target is online when the sender requests the connection info
TaskOperationResult result2 = tm.submitTask(tdd2);
TaskOperationResult result1 = tm.submitTask(tdd1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 205ad00..c6379eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.util;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index 27fa540..e2312ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;