You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:55 UTC
[31/63] [abbrv] git commit: Redesign Scheduler from pre-assignment to
more flexible schedule-on-demand model
Redesign Scheduler from pre-assignment to more flexible schedule-on-demand model
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2d6199ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2d6199ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2d6199ff
Branch: refs/heads/master
Commit: 2d6199fff877b0532903a4b2ff2d5279671b33cb
Parents: c32569a
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 20 13:11:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 4 +-
.../api/common/io/FileInputFormatTest.java | 1 -
.../runtime/executiongraph/ExecutionVertex.java | 4 +
.../executiongraph/ExecutionVertex2.java | 25 +
.../runtime/instance/AllocatedResource.java | 163 --
.../flink/runtime/instance/AllocatedSlot.java | 73 +-
.../instance/DefaultInstanceManager.java | 134 +-
.../flink/runtime/instance/DummyInstance.java | 64 -
.../runtime/instance/HardwareDescription.java | 10 +
.../apache/flink/runtime/instance/Instance.java | 378 ++---
.../instance/InstanceConnectionInfo.java | 2 +-
.../runtime/instance/InstanceDiedException.java | 34 +
.../runtime/instance/InstanceException.java | 42 -
.../runtime/instance/InstanceListener.java | 33 +-
.../runtime/instance/InstanceNotifier.java | 77 -
.../scheduler/DefaultExecutionListener.java | 133 --
.../jobmanager/scheduler/DefaultScheduler.java | 1568 ++++++++++--------
.../scheduler/InstanceFillDegreeComparator.java | 31 +
.../jobmanager/scheduler/LifoSetQueue.java | 110 ++
.../scheduler/NoResourceAvailableException.java | 33 +
.../jobmanager/scheduler/ResourceId.java | 20 +
.../jobmanager/scheduler/ScheduledUnit.java | 67 +
.../scheduler/SchedulingException.java | 44 -
.../scheduler/SchedulingStrategy.java | 33 +
.../runtime/protocols/JobManagerProtocol.java | 22 +-
.../flink/runtime/taskmanager/TaskManager.java | 370 +++--
.../RegisterTaskManagerResult.java | 56 -
.../instance/LocalInstanceManagerTest.java | 11 -
.../scheduler/DefaultSchedulerTest.java | 298 ++--
.../jobmanager/scheduler/LifoSetQueueTest.java | 128 ++
.../scheduler/TestDeploymentManager.java | 108 --
.../scheduler/TestInstanceManager.java | 194 ---
.../splitassigner/DefaultSplitAssignerTest.java | 25 +-
.../LocatableSplitAssignerTest.java | 25 +-
34 files changed, 2081 insertions(+), 2239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4fba186..b2af886 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -384,9 +384,9 @@ public final class ConfigConstants {
public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
/**
- * The default interval for TaskManager heart beats (2000 msecs).
+ * The default interval for TaskManager heart beats (5000 msecs).
*/
- public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 2000;
+ public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 5000;
/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 9e93e67..707ecca 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
public class FileInputFormatTest {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 72e0696..d1ee262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -1000,4 +1000,8 @@ public final class ExecutionVertex {
return tdd;
}
+
+ public void handleException(Throwable t) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
new file mode 100644
index 0000000..ab33ca0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -0,0 +1,25 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.executiongraph;
+
+public class ExecutionVertex2 {
+
+
+
+ public void handleException(Throwable t) {
+ t.printStackTrace();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
deleted file mode 100644
index c626309..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedResource.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-
-/**
- * An allocated resource object unambiguously defines the
- * hardware resources which have been assigned to an {@link org.apache.flink.runtime.executiongraph.ExecutionVertex} for
- * executing a task. The allocated resource is comprised of an {@link Instance}
- * which identifies the node the task is scheduled to run on as well as an
- * {@link org.apache.flink.runtime.instance.AllocationID} which determines the resources the task is scheduled to
- * allocate within the node.
- * <p>
- * The class is thread-safe.
- *
- */
-public final class AllocatedResource {
-
- /**
- * The instance a task is scheduled to run on.
- */
- private final Instance instance;
-
- /**
- * The allocation ID identifying the resources within the instance
- * which the task is expected to run on.
- */
- private final AllocationID allocationID;
-
- /**
- * The set stores the execution vertices which are currently scheduled to run this resource.
- */
- private final Set<ExecutionVertex> assignedVertices = Collections
- .newSetFromMap(new ConcurrentHashMap<ExecutionVertex, Boolean>());
-
- /**
- * Constructs a new allocated resource object.
- *
- * @param instance
- * the instance a task is scheduled to run on.
- * @param allocationID
- * the allocation ID identifying the allocated resources within the instance
- */
- public AllocatedResource(final Instance instance, final AllocationID allocationID) {
- this.instance = instance;
- this.allocationID = allocationID;
- }
-
- /**
- * Returns the instance a task is scheduled to run on.
- *
- * @return the instance a task is scheduled to run on
- */
- public Instance getInstance() {
- return this.instance;
- }
-
- /**
- * Returns the allocation ID which identifies the resource allocated within the assigned instance.
- *
- * @return the allocation ID or <code>null</code> if the assigned instance is of type {@link DummyInstance}
- */
- public AllocationID getAllocationID() {
- return this.allocationID;
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (obj instanceof AllocatedResource) {
-
- final AllocatedResource allocatedResource = (AllocatedResource) obj;
- if (!this.instance.equals(allocatedResource.getInstance())) {
- return false;
- }
-
- if (this.allocationID == null) {
- if (allocatedResource.getAllocationID() != null) {
- return false;
- }
- } else {
- if (!this.allocationID.equals(allocatedResource.getAllocationID())) {
- return false;
- }
- }
-
- return true;
- }
-
- return false;
- }
-
-
- @Override
- public int hashCode() {
-
- if (this.allocationID == null) {
- return 0;
- }
-
- return this.allocationID.hashCode();
- }
-
- /**
- * Assigns the given execution vertex to this allocated resource.
- *
- * @param vertex
- * the vertex to assign to this resource
- */
- public void assignVertexToResource(final ExecutionVertex vertex) {
-
- if (!this.assignedVertices.add(vertex)) {
- throw new IllegalStateException("The vertex " + vertex + " has already been assigned to resource " + this);
- }
- }
-
- /**
- * Returns an iterator over all execution vertices currently assigned to this allocated resource.
- *
- * @return an iterator over all execution vertices currently assigned to this allocated resource
- */
- public Iterator<ExecutionVertex> assignedVertices() {
-
- return this.assignedVertices.iterator();
- }
-
- /**
- * Removes the given execution vertex from this allocated resource.
- *
- * @param vertex
- * the execution to be removed
- */
- public void removeVertexFromResource(final ExecutionVertex vertex) {
-
- if (!this.assignedVertices.remove(vertex)) {
- throw new IllegalStateException("The vertex " + vertex + " has not been assigned to resource " + this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index d85bf39..71af9db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -16,56 +16,67 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.instance;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
/**
- * An allocated slot is a part of an instance which is assigned to a job.
- * <p>
- * This class is thread-safe.
- *
+ * An allocated slot is the unit in which resources are allocated on instances.
*/
public class AllocatedSlot {
- /**
- * The allocation ID which identifies the resources occupied by this slot.
- */
- private final AllocationID allocationID;
+ /** The ID which identifies the resources occupied by this slot. */
+ private final ResourceId resourceId;
- /**
- * The ID of the job this slice belongs to.
- */
+ /** The ID of the job this slice belongs to. */
private final JobID jobID;
+
+ /** The instance on which the slot is allocated */
+ private final Instance instance;
+
+ /** The number of the slot on which the task is deployed */
+ private final int slotNumber;
- /**
- * Creates a new allocated slice on the given hosting instance.
- *
- * @param jobID
- * the ID of the job this slice belongs to
- */
- public AllocatedSlot(final JobID jobID) {
- this.allocationID = new AllocationID();
+ public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
+ this.resourceId = resourceId;
this.jobID = jobID;
+ this.instance = instance;
+ this.slotNumber = slotNumber;
}
+ // --------------------------------------------------------------------------------------------
+
/**
- * Returns the allocation ID of this slice.
- *
- * @return the allocation ID of this slice
- */
- public AllocationID getAllocationID() {
- return this.allocationID;
- }
-
- /**
- * Returns the ID of the job this allocated slice belongs to.
+ * Returns the ID of the job this allocated slot belongs to.
*
- * @return the ID of the job this allocated slice belongs to
+ * @return the ID of the job this allocated slot belongs to
*/
public JobID getJobID() {
return this.jobID;
}
+
+ public ResourceId getResourceId() {
+ return resourceId;
+ }
+
+ public Instance getInstance() {
+ return instance;
+ }
+
+ public int getSlotNumber() {
+ return slotNumber;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void runTask(ExecutionVertex2 vertex) {
+
+ }
+
+ public void cancelResource() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index b19adbb..eca23c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -54,6 +59,9 @@ public class DefaultInstanceManager implements InstanceManager {
/** Set of hosts that were present once and have died */
private final Set<InstanceConnectionInfo> deadHosts;
+
+ /** Listeners that want to be notified about availability and disappearance of instances */
+ private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
/** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
private final long heartbeatTimeout;
@@ -106,7 +114,7 @@ public class DefaultInstanceManager implements InstanceManager {
this.cleanupStaleMachines.cancel();
for (Instance i : this.registeredHostsById.values()) {
- i.destroy();
+ i.markDead();
}
this.registeredHostsById.clear();
@@ -183,6 +191,9 @@ public class DefaultInstanceManager implements InstanceManager {
host.reportHeartBeat();
+ // notify all listeners (for example the scheduler)
+ notifyNewInstance(host);
+
return id;
}
}
@@ -204,48 +215,99 @@ public class DefaultInstanceManager implements InstanceManager {
// --------------------------------------------------------------------------------------------
+ public void addInstanceListener(InstanceListener listener) {
+ synchronized (this.instanceListeners) {
+ this.instanceListeners.add(listener);
+ }
+ }
+
+ public void removeInstanceListener(InstanceListener listener) {
+ synchronized (this.instanceListeners) {
+ this.instanceListeners.remove(listener);
+ }
+ }
+
+ private void notifyNewInstance(Instance instance) {
+ synchronized (this.instanceListeners) {
+ for (InstanceListener listener : this.instanceListeners) {
+ try {
+ listener.newInstanceAvailable(instance);
+ }
+ catch (Throwable t) {
+ LOG.error("Notification of new instance availability failed.", t);
+ }
+ }
+ }
+ }
+
+ private void notifyDeadInstance(Instance instance) {
+ synchronized (this.instanceListeners) {
+ for (InstanceListener listener : this.instanceListeners) {
+ try {
+ listener.instanceDied(instance);
+ }
+ catch (Throwable t) {
+ LOG.error("Notification of dead instance failed.", t);
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void checkForDeadInstances() {
+ final long now = System.currentTimeMillis();
+ final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
+
+ synchronized (DefaultInstanceManager.this.lock) {
+ if (DefaultInstanceManager.this.shutdown) {
+ return;
+ }
+
+ final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
+
+ // check all hosts whether they did not send heart-beat messages.
+ while (entries.hasNext()) {
+
+ final Map.Entry<InstanceID, Instance> entry = entries.next();
+ final Instance host = entry.getValue();
+
+ if (!host.isStillAlive(now, timeout)) {
+
+ // remove from the living
+ entries.remove();
+ registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
+
+ // add to the dead
+ deadHosts.add(host.getInstanceConnectionInfo());
+
+ host.markDead();
+
+ totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
+
+ LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
+ host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
+
+ // report to all listeners
+ notifyDeadInstance(host);
+ }
+ }
+ }
+ }
+ // --------------------------------------------------------------------------------------------
+
/**
* Periodic task that checks whether hosts have not sent their heart-beat
* messages and purges the hosts in this case.
*/
private final TimerTask cleanupStaleMachines = new TimerTask() {
-
@Override
public void run() {
-
- final long now = System.currentTimeMillis();
- final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
-
- synchronized (DefaultInstanceManager.this.lock) {
- if (DefaultInstanceManager.this.shutdown) {
- return;
- }
-
- final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-
- // check all hosts whether they did not send heart-beat messages.
- while (entries.hasNext()) {
-
- final Map.Entry<InstanceID, Instance> entry = entries.next();
- final Instance host = entry.getValue();
-
- if (!host.isStillAlive(now, timeout)) {
-
- // remove from the living
- entries.remove();
- registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
-
- // add to the dead
- deadHosts.add(host.getInstanceConnectionInfo());
-
- host.markDied();
-
- totalNumberOfAliveTaskSlots -= host.getNumberOfSlots();
-
- LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
- host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
- }
- }
+ try {
+ checkForDeadInstances();
+ }
+ catch (Throwable t) {
+ LOG.error("Checking for dead instances failed.", t);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
deleted file mode 100644
index af965c0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DummyInstance.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-/**
- * A DummyInstance is a stub implementation of the {@link Instance} interface.
- * Dummy instances are used to plan a job execution but must be replaced with
- * concrete instances before the job execution starts.
- *
- */
-public class DummyInstance extends Instance {
-
- private static int nextID = 0;
-
- private final String name;
-
- public static synchronized DummyInstance createDummyInstance() {
-
- return new DummyInstance(nextID++);
- }
-
- /**
- * Constructs a new dummy instance of the given instance type.
- *
- * @param id
- * the ID of the dummy instance
- */
- private DummyInstance(int id) {
- super(null, null, null, null, 0);
-
- this.name = "DummyInstance_" + Integer.toString(id);
- }
-
-
- @Override
- public String toString() {
-
- return this.name;
- }
-
-
- @Override
- public HardwareDescription getHardwareDescription() {
-
- throw new RuntimeException("getHardwareDescription is called on a DummyInstance");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index affbdd6..32d6572 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -121,6 +121,16 @@ public final class HardwareDescription implements IOReadableWritable, java.io.Se
}
// --------------------------------------------------------------------------------------------
+ // Utils
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("cores=%d, physMem=%d, heap=%d, managed=%d",
+ numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Factory
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index c895bbb..3d39c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -16,38 +16,30 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.instance;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
import java.util.Set;
-import java.util.Collection;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
+import org.eclipse.jetty.util.log.Log;
/**
* An instance represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
*/
public class Instance {
+ /** The lock on which to synchronize allocations and failure state changes */
+ private final Object instanceLock = new Object();
+
/** The connection info to connect to the task manager represented by this instance. */
private final InstanceConnectionInfo instanceConnectionInfo;
@@ -60,20 +52,21 @@ public class Instance {
/** The number of task slots available on the node */
private final int numberOfSlots;
- /**
- * Allocated slots on this instance
- */
- private final Map<AllocationID, AllocatedSlot> allocatedSlots = new HashMap<AllocationID, AllocatedSlot>();
+
+ private final Queue<Integer> availableSlots;
+
+ /** Allocated slots on this instance */
+ private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
- /**
- * Stores the RPC stub object for the instance's task manager.
- */
- private TaskOperationProtocol taskManager = null;
+ /** The RPC proxy to send calls to the task manager represented by this instance */
+ private volatile TaskOperationProtocol taskManager ;
/**
* Time when last heat beat has been received from the task manager running on this instance.
*/
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
+
+ private volatile boolean isDead;
/**
* Constructs an abstract instance object.
@@ -88,125 +81,108 @@ public class Instance {
this.instanceId = id;
this.resources = resources;
this.numberOfSlots = numberOfSlots;
+
+ this.availableSlots = new ArrayDeque<Integer>();
+ for (int i = 0; i < numberOfSlots; i++) {
+ this.availableSlots.add(i);
+ }
}
- /**
- * Creates or returns the RPC stub object for the instance's task manager.
- *
- * @return the RPC stub object for the instance's task manager
- * @throws IOException
- * thrown if the RPC stub object for the task manager cannot be created
- */
- private TaskOperationProtocol getTaskManagerProxy() throws IOException {
-
- if (this.taskManager == null) {
-
- this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
- new InetSocketAddress(getInstanceConnectionInfo().address(),
- getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+ public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+ TaskOperationProtocol tm = this.taskManager;
+
+ if (tm == null) {
+ synchronized (this) {
+ if (this.taskManager == null) {
+ this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+ new InetSocketAddress(getInstanceConnectionInfo().address(),
+ getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+ }
+ tm = this.taskManager;
+ }
}
-
- return this.taskManager;
+
+ return tm;
}
- /**
- * Destroys and removes the RPC stub object for this instance's task manager.
- */
+ /** Destroys and removes the RPC stub object for this instance's task manager. */
private void destroyTaskManagerProxy() {
-
- if (this.taskManager != null) {
- RPC.stopProxy(this.taskManager);
- this.taskManager = null;
+ synchronized (this) {
+ if (this.taskManager != null) {
+ try {
+ RPC.stopProxy(this.taskManager);
+ } catch (Throwable t) {
+ Log.debug("Error shutting down RPC proxy.", t);
+ }
+ }
}
}
- /**
- * Returns the instance's connection information object.
- *
- * @return the instance's connection information object
- */
- public final InstanceConnectionInfo getInstanceConnectionInfo() {
- return this.instanceConnectionInfo;
- }
- /**
- * Checks if all the libraries required to run the job with the given
- * job ID are available on this instance. Any libary that is missing
- * is transferred to the instance as a result of this call.
- *
- * @param jobID
- * the ID of the job whose libraries are to be checked for
- * @throws IOException
- * thrown if an error occurs while checking for the libraries
- */
- public synchronized void checkLibraryAvailability(final JobID jobID) throws IOException {
-
- // Now distribute the required libraries for the job
- String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
-
- if (requiredLibraries == null) {
- throw new IOException("No entry of required libraries for job " + jobID);
+
+ // --------------------------------------------------------------------------------------------
+ // Life and Death
+ // --------------------------------------------------------------------------------------------
+
+ public boolean isAlive() {
+ return !isDead;
+ }
+
+ public void markDead() {
+ if (isDead) {
+ return;
}
-
- LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
- request.setRequiredLibraries(requiredLibraries);
-
- // Send the request
- LibraryCacheProfileResponse response = null;
- response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
- // Check response and transfer libraries if necessary
- for (int k = 0; k < requiredLibraries.length; k++) {
- if (!response.isCached(k)) {
- LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
- getTaskManagerProxy().updateLibraryCache(update);
+
+ isDead = true;
+
+ synchronized (instanceLock) {
+ this.allocatedSlots.clear();
+ for (AllocatedSlot slot : allocatedSlots) {
+ slot.cancelResource();
}
}
+
+ destroyTaskManagerProxy();
}
-
- /**
- * Submits a list of tasks to the instance's {@link org.apache.flink.runtime.taskmanager.TaskManager}.
- *
- * @param tasks
- * the list of tasks to be submitted
- * @return the result of the submission attempt
- * @throws IOException
- * thrown if an error occurs while transmitting the task
- */
- public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks) throws IOException {
- return getTaskManagerProxy().submitTasks(tasks);
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public InstanceID getId() {
+ return instanceId;
}
-
+
+ public HardwareDescription getResources() {
+ return this.resources;
+ }
+
+ public int getTotalNumberOfSlots() {
+ return numberOfSlots;
+ }
+
/**
- * Cancels the task identified by the given ID at the instance's
- * {@link org.apache.flink.runtime.taskmanager.TaskManager}.
+ * Returns the instance's connection information object.
*
- * @param id
- * the ID identifying the task to be canceled
- * @throws IOException
- * thrown if an error occurs while transmitting the request or receiving the response
- * @return the result of the cancel attempt
+ * @return the instance's connection information object
*/
- public synchronized TaskCancelResult cancelTask(final ExecutionVertexID id) throws IOException {
-
- return getTaskManagerProxy().cancelTask(id);
+ public InstanceConnectionInfo getInstanceConnectionInfo() {
+ return this.instanceConnectionInfo;
}
-
+
+ // --------------------------------------------------------------------------------------------
+ // Heartbeats
+ // --------------------------------------------------------------------------------------------
+
/**
- * Kills the task identified by the given ID at the instance's
- * {@link org.apache.flink.runtime.taskmanager.TaskManager}.
+ * Gets the timestamp of the last heartbeat.
*
- * @param id
- * the ID identifying the task to be killed
- * @throws IOException
- * thrown if an error occurs while transmitting the request or receiving the response
- * @return the result of the kill attempt
+ * @return The timestamp of the last heartbeat.
*/
- public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
- return getTaskManagerProxy().killTask(id);
+ public long getLastHeartBeat() {
+ return this.lastReceivedHeartBeat;
}
-
+
/**
* Updates the time of last received heart beat to the current system time.
*/
@@ -214,141 +190,53 @@ public class Instance {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}
- public boolean isStillAlive(long now, long cleanUpInterval) {
- return this.lastReceivedHeartBeat + cleanUpInterval > now;
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- // Fall back since dummy instances do not have a instanceConnectionInfo
- if (this.instanceConnectionInfo == null) {
- return super.equals(obj);
- }
-
- if (!(obj instanceof Instance)) {
- return false;
- }
-
- final Instance abstractInstance = (Instance) obj;
-
- return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
- }
-
-
- @Override
- public int hashCode() {
-
- // Fall back since dummy instances do not have a instanceConnectionInfo
- if (this.instanceConnectionInfo == null) {
- return super.hashCode();
- }
-
- return this.instanceConnectionInfo.hashCode();
- }
-
- /**
- * Triggers the remote task manager to print out the current utilization of its read and write buffers to its logs.
- *
- * @throws IOException
- * thrown if an error occurs while transmitting the request
- */
- public synchronized void logBufferUtilization() throws IOException {
-
- getTaskManagerProxy().logBufferUtilization();
- }
-
- /**
- * Kills the task manager running on this instance. This method is mainly intended to test and debug Nephele's fault
- * tolerance mechanisms.
- *
- * @throws IOException
- * thrown if an error occurs while transmitting the request
- */
- public synchronized void killTaskManager() throws IOException {
-
- getTaskManagerProxy().killTaskManager();
- }
-
- /**
- * Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
- *
- * @param channelIDs
- * the channel IDs identifying the cache entries to invalidate
- * @throws IOException
- * thrown if an error occurs during this remote procedure call
- */
- public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
- getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
- }
-
/**
- * Destroys all RPC stub objects attached to this instance.
+ * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
+ * before the given timestamp {@code now}.
+ *
+ * @param now The timestamp representing the current time.
+ * @param cleanUpInterval The maximum time (in msecs) that the last heartbeat may lie in the past.
+ * @return True, if this instance is considered alive, false otherwise.
*/
- public synchronized void destroyProxies() {
-
- destroyTaskManagerProxy();
-
- }
-
- public int getNumberOfSlots() {
- return numberOfSlots;
- }
-
- public int getNumberOfAvailableSlots() { return numberOfSlots - allocatedSlots.size(); }
-
- public synchronized AllocatedResource allocateSlot(JobID jobID) throws InstanceException{
- if(allocatedSlots.size() < numberOfSlots){
- AllocatedSlot slot = new AllocatedSlot(jobID);
-
- allocatedSlots.put(slot.getAllocationID(), slot);
- return new AllocatedResource(this,slot.getAllocationID());
- }else{
- throw new InstanceException("Overbooking instance " + instanceConnectionInfo + ".");
- }
- }
-
- public synchronized void releaseSlot(AllocationID allocationID) {
- if(allocatedSlots.containsKey(allocationID)){
- allocatedSlots.remove(allocationID);
- }else{
- throw new RuntimeException("There is no slot registered with allocation ID " + allocationID + ".");
- }
- }
-
- public Collection<AllocatedSlot> getAllocatedSlots() {
- return allocatedSlots.values();
- }
-
- public Collection<AllocatedSlot> removeAllocatedSlots() {
- Collection<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(this.allocatedSlots.values());
-
- for(AllocatedSlot slot : slots){
- releaseSlot(slot.getAllocationID());
- }
-
- return slots;
- }
-
- public long getLastHeartBeat() {
- return this.lastReceivedHeartBeat;
+ public boolean isStillAlive(long now, long cleanUpInterval) {
+ return this.lastReceivedHeartBeat + cleanUpInterval > now;
}
+ // --------------------------------------------------------------------------------------------
+ // Resource allocation
+ // --------------------------------------------------------------------------------------------
- public void markDied() {
-
+ public AllocatedSlot allocateSlot(JobID jobID, ResourceId resourceId) throws InstanceDiedException {
+ synchronized (instanceLock) {
+ if (isDead) {
+ throw new InstanceDiedException(this);
+ }
+
+ Integer nextSlot = availableSlots.poll();
+ if (nextSlot == null) {
+ return null;
+ } else {
+ AllocatedSlot slot = new AllocatedSlot(jobID, resourceId, this, nextSlot);
+ allocatedSlots.add(slot);
+ return slot;
+ }
+ }
}
- public void destroy() {
-
+ public int getNumberOfAvailableSlots() {
+ return this.availableSlots.size();
}
- public InstanceID getId() {
- return instanceId;
+ public boolean hasResourcesAvailable() {
+ return !isDead && getNumberOfAvailableSlots() > 0;
}
- public HardwareDescription getResources() {
- return this.resources;
+ // --------------------------------------------------------------------------------------------
+ // Standard Utilities
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Instance (" + this.instanceConnectionInfo + "), resources: " + this.resources + ", numberOfSlots=" + numberOfSlots;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index daf7e0d..4cbb2a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -183,7 +183,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
}
// --------------------------------------------------------------------------------------------
- // Serialization
+ // Utilities
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
new file mode 100644
index 0000000..42b9817
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
@@ -0,0 +1,34 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.instance;
+
+/**
+ * A special instance signaling that an attempted operation on an instance is not possible,
+ * because the instance has died.
+ */
+public class InstanceDiedException extends Exception {
+ private static final long serialVersionUID = -4917918318403135745L;
+
+ private final Instance instance;
+
+ public InstanceDiedException(Instance instance) {
+ this.instance = instance;
+ }
+
+ public Instance getInstance() {
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
deleted file mode 100644
index 61d5868..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-/**
- * An instance exception is thrown if the allocation, assignment or deallocation of an instance fails.
- *
- */
-public class InstanceException extends Exception {
-
- /**
- * The generated serial UID.
- */
- private static final long serialVersionUID = 3463832262505896962L;
-
- /**
- * Constructs a new instance exception with the given error message.
- *
- * @param errorMsg
- * the error message to be included in the exception.
- */
- public InstanceException(String errorMsg) {
- super(errorMsg);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
index fbdef54..76e63b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
@@ -16,38 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.instance;
-import java.util.List;
-
-import org.apache.flink.runtime.jobgraph.JobID;
-
/**
- * Classes implementing the {@link InstanceListener} interface can be notified about
- * the availability or the unexpected failure of an instance.
- *
+ * Classes implementing the InstanceListener interface can be notified about
+ * the availability disappearance of instances.
*/
public interface InstanceListener {
- /**
- * Called if one or more requested resources have become available.
- *
- * @param jobID
- * the ID of the job the initial request has been triggered for
- * @param allocatedResources
- * the resources which have been allocated as a response to the initial request
- */
- void resourcesAllocated(JobID jobID, List<AllocatedResource> allocatedResources);
-
- /**
- * Called if one or more allocated resources assigned to at least one job have died unexpectedly.
- *
- * @param jobID
- * the ID of the job the instance is used for
- * @param allocatedResource
- * the allocated resources which are affected by the instance death
- */
- void allocatedResourcesDied(JobID jobID, List<AllocatedResource> allocatedResource);
-
+ void newInstanceAvailable(Instance instance);
+
+ void instanceDied(Instance instance);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
deleted file mode 100644
index 9e85a83..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceNotifier.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import java.util.List;
-
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * This class is an auxiliary class to send the notification
- * about the availability of an {@link org.apache.flink.runtime.instance.Instance} to the given {@link
- * InstanceListener} object. The notification must be sent from
- * a separate thread, otherwise the atomic operation of requesting an instance
- * for a vertex and switching to the state ASSIGNING could not be guaranteed.
- * This class is thread-safe.
- *
- */
-public class InstanceNotifier extends Thread {
-
- /**
- * The {@link InstanceListener} object to send the notification to.
- */
- private final InstanceListener instanceListener;
-
- /**
- * The ID of the job the notification refers to.
- */
- private final JobID jobID;
-
- /**
- * The allocated resources the notification refers to.
- */
- private final List<AllocatedResource> allocatedResources;
-
- /**
- * Constructs a new instance notifier object.
- *
- * @param instanceListener
- * the listener to send the notification to
- * @param jobID
- * the ID of the job the notification refers to
- * @param allocatedResources
- * the resources with has been allocated for the job
- */
- public InstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
- final List<AllocatedResource> allocatedResources) {
- this.instanceListener = instanceListener;
- this.jobID = jobID;
- this.allocatedResources = allocatedResources;
- }
-
-
- @Override
- public void run() {
-
- this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
deleted file mode 100644
index d0bbdca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultExecutionListener.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionPipeline;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-public class DefaultExecutionListener implements ExecutionListener {
-
- /**
- * The instance of the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler}.
- */
- private final DefaultScheduler scheduler;
-
- /**
- * The {@link ExecutionVertex} this wrapper object belongs to.
- */
- private final ExecutionVertex executionVertex;
-
- /**
- * Constructs a new wrapper object for the given {@link ExecutionVertex}.
- *
- * @param scheduler
- * the instance of the {@link DefaultScheduler}
- * @param executionVertex
- * the {@link ExecutionVertex} the received notification refer to
- */
- public DefaultExecutionListener(final DefaultScheduler scheduler, final ExecutionVertex executionVertex) {
- this.scheduler = scheduler;
- this.executionVertex = executionVertex;
- }
-
-
- @Override
- public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
- final ExecutionState newExecutionState, final String optionalMessage) {
-
- final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
-
- // Check if we can deploy a new pipeline.
- if (newExecutionState == ExecutionState.FINISHING) {
-
- final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
- if (!pipeline.isFinishing()) {
- // Some tasks of the pipeline are still running
- return;
- }
-
- // Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
- final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
- for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
- final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
- if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
-
- final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
- pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
- pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
-
- this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
- return;
- }
- }
- }
-
- if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
-
- synchronized (eg) {
-
- if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
-
- if (eg.getJobStatus() == InternalJobStatus.FAILING) {
- return;
- }
-
- this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
-
- // Run through the deployment procedure
- this.scheduler.deployAssignedVertices(this.executionVertex);
- return;
- }
- }
- }
-
- if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
- || newExecutionState == ExecutionState.FAILED) {
- // Check if instance can be released
- this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
- }
- }
-
-
- @Override
- public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
-
- @Override
- public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
-
- @Override
- public int getPriority() {
-
- return 0;
- }
-}